1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""TraceEventImporter imports TraceEvent-formatted data 5into the provided model. 6This is a port of the trace event importer from 7https://code.google.com/p/trace-viewer/ 8""" 9 10import collections 11import copy 12 13import telemetry.timeline.async_slice as tracing_async_slice 14import telemetry.timeline.flow_event as tracing_flow_event 15from telemetry.timeline import importer 16from telemetry.timeline import memory_dump_event 17from tracing.trace_data import trace_data as trace_data_module 18 19 20class TraceEventTimelineImporter(importer.TimelineImporter): 21 def __init__(self, model, trace_data): 22 super(TraceEventTimelineImporter, self).__init__( 23 model, trace_data, import_order=1) 24 assert isinstance(trace_data, trace_data_module.TraceData) 25 self._trace_data = trace_data 26 27 self._all_async_events = [] 28 self._all_object_events = [] 29 self._all_flow_events = [] 30 self._all_memory_dumps_by_dump_id = collections.defaultdict(list) 31 32 self._events = [] 33 self._metadata = [] 34 for trace in trace_data.GetTracesFor(trace_data_module.CHROME_TRACE_PART): 35 self._events.extend(trace['traceEvents']) 36 self.CollectMetadataRecords(trace) 37 38 def CollectMetadataRecords(self, trace): 39 part_field_names = {p.raw_field_name for p in 40 trace_data_module.ALL_TRACE_PARTS} 41 for k, v in trace.iteritems(): 42 if k in part_field_names: 43 continue 44 self._metadata.append({'name': k, 'value': v}) 45 46 47 @staticmethod 48 def GetSupportedPart(): 49 return trace_data_module.CHROME_TRACE_PART 50 51 def _GetOrCreateProcess(self, pid): 52 return self._model.GetOrCreateProcess(pid) 53 54 def _DeepCopyIfNeeded(self, obj): 55 if self._trace_data.events_are_safely_mutable: 56 return obj 57 return copy.deepcopy(obj) 58 59 def _ProcessAsyncEvent(self, event): 60 """Helper to process an 'async finish' event, which will close an 61 open slice. 62 """ 63 thread = (self._GetOrCreateProcess(event['pid']) 64 .GetOrCreateThread(event['tid'])) 65 self._all_async_events.append({ 66 'event': event, 67 'thread': thread}) 68 69 def _ProcessCounterEvent(self, event): 70 """Helper that creates and adds samples to a Counter object based on 71 'C' phase events. 72 """ 73 if 'id' in event: 74 ctr_name = event['name'] + '[' + str(event['id']) + ']' 75 else: 76 ctr_name = event['name'] 77 78 ctr = (self._GetOrCreateProcess(event['pid']) 79 .GetOrCreateCounter(event['cat'], ctr_name)) 80 # Initialize the counter's series fields if needed. 81 if len(ctr.series_names) == 0: 82 #TODO: implement counter object 83 for series_name in event['args']: 84 ctr.series_names.append(series_name) 85 if len(ctr.series_names) == 0: 86 self._model.import_errors.append('Expected counter ' + event['name'] + 87 ' to have at least one argument to use as a value.') 88 # Drop the counter. 89 del ctr.parent.counters[ctr.full_name] 90 return 91 92 # Add the sample values. 93 ctr.timestamps.append(event['ts'] / 1000.0) 94 for series_name in ctr.series_names: 95 if series_name not in event['args']: 96 ctr.samples.append(0) 97 continue 98 ctr.samples.append(event['args'][series_name]) 99 100 def _ProcessObjectEvent(self, event): 101 thread = (self._GetOrCreateProcess(event['pid']) 102 .GetOrCreateThread(event['tid'])) 103 self._all_object_events.append({ 104 'event': event, 105 'thread': thread}) 106 107 def _ProcessDurationEvent(self, event): 108 thread = (self._GetOrCreateProcess(event['pid']) 109 .GetOrCreateThread(event['tid'])) 110 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 111 self._model.import_errors.append( 112 'Timestamps are moving backward.') 113 return 114 115 if event['ph'] == 'B': 116 thread.BeginSlice(event['cat'], 117 event['name'], 118 event['ts'] / 1000.0, 119 event['tts'] / 1000.0 if 'tts' in event else None, 120 event['args']) 121 elif event['ph'] == 'E': 122 thread = (self._GetOrCreateProcess(event['pid']) 123 .GetOrCreateThread(event['tid'])) 124 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 125 self._model.import_errors.append( 126 'Timestamps are moving backward.') 127 return 128 if not thread.open_slice_count: 129 self._model.import_errors.append( 130 'E phase event without a matching B phase event.') 131 return 132 133 new_slice = thread.EndSlice( 134 event['ts'] / 1000.0, 135 event['tts'] / 1000.0 if 'tts' in event else None) 136 for arg_name, arg_value in event.get('args', {}).iteritems(): 137 if arg_name in new_slice.args: 138 self._model.import_errors.append( 139 'Both the B and E phases of ' + new_slice.name + 140 ' provided values for argument ' + arg_name + '. ' + 141 'The value of the E phase event will be used.') 142 new_slice.args[arg_name] = arg_value 143 144 def _ProcessCompleteEvent(self, event): 145 thread = (self._GetOrCreateProcess(event['pid']) 146 .GetOrCreateThread(event['tid'])) 147 thread.PushCompleteSlice( 148 event['cat'], 149 event['name'], 150 event['ts'] / 1000.0, 151 event['dur'] / 1000.0 if 'dur' in event else None, 152 event['tts'] / 1000.0 if 'tts' in event else None, 153 event['tdur'] / 1000.0 if 'tdur' in event else None, 154 event['args']) 155 156 def _ProcessMarkEvent(self, event): 157 thread = (self._GetOrCreateProcess(event['pid']) 158 .GetOrCreateThread(event['tid'])) 159 thread.PushMarkSlice( 160 event['cat'], 161 event['name'], 162 event['ts'] / 1000.0, 163 event['tts'] / 1000.0 if 'tts' in event else None, 164 event['args'] if 'args' in event else None) 165 166 def _ProcessMetadataEvent(self, event): 167 if event['name'] == 'thread_name': 168 thread = (self._GetOrCreateProcess(event['pid']) 169 .GetOrCreateThread(event['tid'])) 170 thread.name = event['args']['name'] 171 elif event['name'] == 'process_name': 172 process = self._GetOrCreateProcess(event['pid']) 173 process.name = event['args']['name'] 174 elif event['name'] == 'process_labels': 175 process = self._GetOrCreateProcess(event['pid']) 176 process.labels = event['args']['labels'] 177 elif event['name'] == 'process_uptime_seconds': 178 process = self._GetOrCreateProcess(event['pid']) 179 process.uptime_seconds = event['args']['uptime'] 180 elif event['name'] == 'trace_buffer_overflowed': 181 process = self._GetOrCreateProcess(event['pid']) 182 process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts']) 183 else: 184 self._model.import_errors.append( 185 'Unrecognized metadata name: ' + event['name']) 186 187 def _ProcessInstantEvent(self, event): 188 # Treat an Instant event as a duration 0 slice. 189 # SliceTrack's redraw() knows how to handle this. 190 thread = (self._GetOrCreateProcess(event['pid']) 191 .GetOrCreateThread(event['tid'])) 192 thread.BeginSlice(event['cat'], 193 event['name'], 194 event['ts'] / 1000.0, 195 args=event.get('args')) 196 thread.EndSlice(event['ts'] / 1000.0) 197 198 def _ProcessSampleEvent(self, event): 199 thread = (self._GetOrCreateProcess(event['pid']) 200 .GetOrCreateThread(event['tid'])) 201 thread.AddSample(event['cat'], 202 event['name'], 203 event['ts'] / 1000.0, 204 event.get('args')) 205 206 def _ProcessFlowEvent(self, event): 207 thread = (self._GetOrCreateProcess(event['pid']) 208 .GetOrCreateThread(event['tid'])) 209 self._all_flow_events.append({ 210 'event': event, 211 'thread': thread}) 212 213 def _ProcessMemoryDumpEvents(self, events): 214 # Dictionary to order dumps by id and process. 215 global_dumps = {} 216 for event in events: 217 global_dump = global_dumps.setdefault(event['id'], {}) 218 dump_events = global_dump.setdefault(event['pid'], []) 219 dump_events.append(event) 220 for dump_id, global_dump in global_dumps.iteritems(): 221 for pid, dump_events in global_dump.iteritems(): 222 process = self._GetOrCreateProcess(pid) 223 memory_dump = memory_dump_event.ProcessMemoryDumpEvent(process, 224 dump_events) 225 process.AddMemoryDumpEvent(memory_dump) 226 self._all_memory_dumps_by_dump_id[dump_id].append(memory_dump) 227 228 def ImportEvents(self): 229 """Walks through the events_ list and outputs the structures discovered to 230 model_. 231 """ 232 for r in self._metadata: 233 self._model.metadata.append(r) 234 memory_dump_events = [] 235 for event in self._events: 236 phase = event.get('ph', None) 237 if phase == 'B' or phase == 'E': 238 self._ProcessDurationEvent(event) 239 elif phase == 'X': 240 self._ProcessCompleteEvent(event) 241 # Note, S, F, T are deprecated and replaced by 'b' and 'e'. For 242 # backwards compatibility continue to support them here. 243 elif phase == 'S' or phase == 'F' or phase == 'T': 244 self._ProcessAsyncEvent(event) 245 elif phase == 'b' or phase == 'e': 246 self._ProcessAsyncEvent(event) 247 # Note, I is historic. The instant event marker got changed, but we 248 # want to support loading old trace files so we have both I and i. 249 elif phase == 'I' or phase == 'i': 250 self._ProcessInstantEvent(event) 251 elif phase == 'P': 252 self._ProcessSampleEvent(event) 253 elif phase == 'C': 254 self._ProcessCounterEvent(event) 255 elif phase == 'M': 256 self._ProcessMetadataEvent(event) 257 elif phase == 'N' or phase == 'D' or phase == 'O': 258 self._ProcessObjectEvent(event) 259 elif phase == 's' or phase == 't' or phase == 'f': 260 self._ProcessFlowEvent(event) 261 elif phase == 'v': 262 memory_dump_events.append(event) 263 elif phase == 'R': 264 self._ProcessMarkEvent(event) 265 else: 266 self._model.import_errors.append('Unrecognized event phase: ' + 267 phase + '(' + event['name'] + ')') 268 269 # Memory dumps of a process with the same dump id need to be merged before 270 # processing. So, memory dump events are processed all at once. 271 self._ProcessMemoryDumpEvents(memory_dump_events) 272 return self._model 273 274 def FinalizeImport(self): 275 """Called by the Model after all other importers have imported their 276 events.""" 277 self._model.UpdateBounds() 278 279 # We need to reupdate the bounds in case the minimum start time changes 280 self._model.UpdateBounds() 281 self._CreateAsyncSlices() 282 self._CreateFlowSlices() 283 self._SetBrowserProcess() 284 self._SetGpuProcess() 285 self._CreateExplicitObjects() 286 self._CreateImplicitObjects() 287 self._CreateMemoryDumps() 288 289 def _CreateAsyncSlices(self): 290 if len(self._all_async_events) == 0: 291 return 292 293 self._all_async_events.sort(key=lambda x: x['event']['ts']) 294 295 async_event_states_by_name_then_id = {} 296 297 all_async_events = self._all_async_events 298 for async_event_state in all_async_events: 299 event = async_event_state['event'] 300 name = event.get('name', None) 301 if name is None: 302 self._model.import_errors.append( 303 'Async events (ph: b, e, S, T or F) require an name parameter.') 304 continue 305 306 event_id = event.get('id') 307 if event_id is None: 308 self._model.import_errors.append( 309 'Async events (ph: b, e, S, T or F) require an id parameter.') 310 continue 311 312 # TODO(simonjam): Add a synchronous tick on the appropriate thread. 313 314 if event['ph'] == 'S' or event['ph'] == 'b': 315 if not name in async_event_states_by_name_then_id: 316 async_event_states_by_name_then_id[name] = {} 317 if event_id in async_event_states_by_name_then_id[name]: 318 self._model.import_errors.append( 319 'At %d, a slice of the same id %s was already open.' % ( 320 event['ts'], event_id)) 321 continue 322 323 async_event_states_by_name_then_id[name][event_id] = [] 324 async_event_states_by_name_then_id[name][event_id].append( 325 async_event_state) 326 else: 327 if name not in async_event_states_by_name_then_id: 328 self._model.import_errors.append( 329 'At %d, no slice named %s was open.' % (event['ts'], name,)) 330 continue 331 if event_id not in async_event_states_by_name_then_id[name]: 332 self._model.import_errors.append( 333 'At %d, no slice named %s with id=%s was open.' % ( 334 event['ts'], name, event_id)) 335 continue 336 events = async_event_states_by_name_then_id[name][event_id] 337 events.append(async_event_state) 338 339 if event['ph'] == 'F' or event['ph'] == 'e': 340 # Create a slice from start to end. 341 async_slice = tracing_async_slice.AsyncSlice( 342 events[0]['event']['cat'], 343 name, 344 events[0]['event']['ts'] / 1000.0) 345 346 async_slice.duration = ((event['ts'] / 1000.0) 347 - (events[0]['event']['ts'] / 1000.0)) 348 349 async_slice.start_thread = events[0]['thread'] 350 async_slice.end_thread = async_event_state['thread'] 351 if async_slice.start_thread == async_slice.end_thread: 352 if 'tts' in event and 'tts' in events[0]['event']: 353 async_slice.thread_start = events[0]['event']['tts'] / 1000.0 354 async_slice.thread_duration = ((event['tts'] / 1000.0) 355 - (events[0]['event']['tts'] / 1000.0)) 356 async_slice.id = event_id 357 async_slice.args = events[0]['event']['args'] 358 359 # Create sub_slices for each step. 360 for j in xrange(1, len(events)): 361 sub_name = name 362 if events[j - 1]['event']['ph'] == 'T': 363 sub_name = name + ':' + events[j - 1]['event']['args']['step'] 364 sub_slice = tracing_async_slice.AsyncSlice( 365 events[0]['event']['cat'], 366 sub_name, 367 events[j - 1]['event']['ts'] / 1000.0) 368 sub_slice.parent_slice = async_slice 369 370 sub_slice.duration = ((events[j]['event']['ts'] / 1000.0) 371 - (events[j - 1]['event']['ts'] / 1000.0)) 372 373 sub_slice.start_thread = events[j - 1]['thread'] 374 sub_slice.end_thread = events[j]['thread'] 375 if sub_slice.start_thread == sub_slice.end_thread: 376 if 'tts' in events[j]['event'] and \ 377 'tts' in events[j - 1]['event']: 378 sub_slice.thread_duration = \ 379 ((events[j]['event']['tts'] / 1000.0) 380 - (events[j - 1]['event']['tts'] / 1000.0)) 381 382 sub_slice.id = event_id 383 sub_slice.args = events[j - 1]['event']['args'] 384 385 async_slice.AddSubSlice(sub_slice) 386 387 # The args for the finish event go in the last sub_slice. 388 last_slice = async_slice.sub_slices[-1] 389 for arg_name, arg_value in event['args'].iteritems(): 390 last_slice.args[arg_name] = arg_value 391 392 # Add |async_slice| to the start-thread's async_slices. 393 async_slice.start_thread.AddAsyncSlice(async_slice) 394 del async_event_states_by_name_then_id[name][event_id] 395 396 def _CreateExplicitObjects(self): 397 # TODO(tengs): Implement object instance parsing 398 pass 399 400 def _CreateImplicitObjects(self): 401 # TODO(tengs): Implement object instance parsing 402 pass 403 404 def _CreateFlowSlices(self): 405 if len(self._all_flow_events) == 0: 406 return 407 408 self._all_flow_events.sort(key=lambda x: x['event']['ts']) 409 410 flow_id_to_event = {} 411 for data in self._all_flow_events: 412 event = data['event'] 413 thread = data['thread'] 414 if 'name' not in event: 415 self._model.import_errors.append( 416 'Flow events (ph: s, t or f) require a name parameter.') 417 continue 418 if 'id' not in event: 419 self._model.import_errors.append( 420 'Flow events (ph: s, t or f) require an id parameter.') 421 continue 422 423 flow_event = tracing_flow_event.FlowEvent( 424 event['cat'], 425 event['id'], 426 event['name'], 427 event['ts'] / 1000.0, 428 event['args']) 429 thread.AddFlowEvent(flow_event) 430 431 if event['ph'] == 's': 432 if event['id'] in flow_id_to_event: 433 self._model.import_errors.append( 434 'event id %s already seen when encountering start of' 435 'flow event.' % event['id']) 436 continue 437 flow_id_to_event[event['id']] = flow_event 438 elif event['ph'] == 't' or event['ph'] == 'f': 439 if not event['id'] in flow_id_to_event: 440 self._model.import_errors.append( 441 'Found flow phase %s for id: %s but no flow start found.' % ( 442 event['ph'], event['id'])) 443 continue 444 flow_position = flow_id_to_event[event['id']] 445 self._model.flow_events.append([flow_position, flow_event]) 446 447 if event['ph'] == 'f': 448 del flow_id_to_event[event['id']] 449 else: 450 # Make this event the next start event in this flow. 451 flow_id_to_event[event['id']] = flow_event 452 453 def _CreateMemoryDumps(self): 454 self._model.SetGlobalMemoryDumps( 455 memory_dump_event.GlobalMemoryDump(events) 456 for events in self._all_memory_dumps_by_dump_id.itervalues()) 457 458 def _SetBrowserProcess(self): 459 for thread in self._model.GetAllThreads(): 460 if thread.name == 'CrBrowserMain': 461 self._model.browser_process = thread.parent 462 463 def _SetGpuProcess(self): 464 for thread in self._model.GetAllThreads(): 465 if thread.name == 'CrGpuMain': 466 self._model.gpu_process = thread.parent 467