1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""TraceEventImporter imports TraceEvent-formatted data 5into the provided model. 6This is a port of the trace event importer from 7https://code.google.com/p/trace-viewer/ 8""" 9 10import collections 11import copy 12 13import telemetry.timeline.async_slice as tracing_async_slice 14import telemetry.timeline.flow_event as tracing_flow_event 15from telemetry.timeline import importer 16from telemetry.timeline import memory_dump_event 17from telemetry.timeline import trace_data as trace_data_module 18 19 20class TraceEventTimelineImporter(importer.TimelineImporter): 21 def __init__(self, model, trace_data): 22 super(TraceEventTimelineImporter, self).__init__( 23 model, trace_data, import_order=1) 24 assert isinstance(trace_data, trace_data_module.TraceData) 25 self._trace_data = trace_data 26 27 self._all_async_events = [] 28 self._all_object_events = [] 29 self._all_flow_events = [] 30 self._all_memory_dumps_by_dump_id = collections.defaultdict(list) 31 32 self._events = trace_data.GetEventsFor(trace_data_module.CHROME_TRACE_PART) 33 34 @staticmethod 35 def GetSupportedPart(): 36 return trace_data_module.CHROME_TRACE_PART 37 38 def _GetOrCreateProcess(self, pid): 39 return self._model.GetOrCreateProcess(pid) 40 41 def _DeepCopyIfNeeded(self, obj): 42 if self._trace_data.events_are_safely_mutable: 43 return obj 44 return copy.deepcopy(obj) 45 46 def _ProcessAsyncEvent(self, event): 47 """Helper to process an 'async finish' event, which will close an 48 open slice. 49 """ 50 thread = (self._GetOrCreateProcess(event['pid']) 51 .GetOrCreateThread(event['tid'])) 52 self._all_async_events.append({ 53 'event': event, 54 'thread': thread}) 55 56 def _ProcessCounterEvent(self, event): 57 """Helper that creates and adds samples to a Counter object based on 58 'C' phase events. 59 """ 60 if 'id' in event: 61 ctr_name = event['name'] + '[' + str(event['id']) + ']' 62 else: 63 ctr_name = event['name'] 64 65 ctr = (self._GetOrCreateProcess(event['pid']) 66 .GetOrCreateCounter(event['cat'], ctr_name)) 67 # Initialize the counter's series fields if needed. 68 if len(ctr.series_names) == 0: 69 #TODO: implement counter object 70 for series_name in event['args']: 71 ctr.series_names.append(series_name) 72 if len(ctr.series_names) == 0: 73 self._model.import_errors.append('Expected counter ' + event['name'] + 74 ' to have at least one argument to use as a value.') 75 # Drop the counter. 76 del ctr.parent.counters[ctr.full_name] 77 return 78 79 # Add the sample values. 80 ctr.timestamps.append(event['ts'] / 1000.0) 81 for series_name in ctr.series_names: 82 if series_name not in event['args']: 83 ctr.samples.append(0) 84 continue 85 ctr.samples.append(event['args'][series_name]) 86 87 def _ProcessObjectEvent(self, event): 88 thread = (self._GetOrCreateProcess(event['pid']) 89 .GetOrCreateThread(event['tid'])) 90 self._all_object_events.append({ 91 'event': event, 92 'thread': thread}) 93 94 def _ProcessDurationEvent(self, event): 95 thread = (self._GetOrCreateProcess(event['pid']) 96 .GetOrCreateThread(event['tid'])) 97 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 98 self._model.import_errors.append( 99 'Timestamps are moving backward.') 100 return 101 102 if event['ph'] == 'B': 103 thread.BeginSlice(event['cat'], 104 event['name'], 105 event['ts'] / 1000.0, 106 event['tts'] / 1000.0 if 'tts' in event else None, 107 event['args']) 108 elif event['ph'] == 'E': 109 thread = (self._GetOrCreateProcess(event['pid']) 110 .GetOrCreateThread(event['tid'])) 111 if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0): 112 self._model.import_errors.append( 113 'Timestamps are moving backward.') 114 return 115 if not thread.open_slice_count: 116 self._model.import_errors.append( 117 'E phase event without a matching B phase event.') 118 return 119 120 new_slice = thread.EndSlice( 121 event['ts'] / 1000.0, 122 event['tts'] / 1000.0 if 'tts' in event else None) 123 for arg_name, arg_value in event.get('args', {}).iteritems(): 124 if arg_name in new_slice.args: 125 self._model.import_errors.append( 126 'Both the B and E phases of ' + new_slice.name + 127 ' provided values for argument ' + arg_name + '. ' + 128 'The value of the E phase event will be used.') 129 new_slice.args[arg_name] = arg_value 130 131 def _ProcessCompleteEvent(self, event): 132 thread = (self._GetOrCreateProcess(event['pid']) 133 .GetOrCreateThread(event['tid'])) 134 thread.PushCompleteSlice( 135 event['cat'], 136 event['name'], 137 event['ts'] / 1000.0, 138 event['dur'] / 1000.0 if 'dur' in event else None, 139 event['tts'] / 1000.0 if 'tts' in event else None, 140 event['tdur'] / 1000.0 if 'tdur' in event else None, 141 event['args']) 142 143 def _ProcessMarkEvent(self, event): 144 thread = (self._GetOrCreateProcess(event['pid']) 145 .GetOrCreateThread(event['tid'])) 146 thread.PushMarkSlice( 147 event['cat'], 148 event['name'], 149 event['ts'] / 1000.0, 150 event['tts'] / 1000.0 if 'tts' in event else None, 151 event['args'] if 'args' in event else None) 152 153 def _ProcessMetadataEvent(self, event): 154 if event['name'] == 'thread_name': 155 thread = (self._GetOrCreateProcess(event['pid']) 156 .GetOrCreateThread(event['tid'])) 157 thread.name = event['args']['name'] 158 elif event['name'] == 'process_name': 159 process = self._GetOrCreateProcess(event['pid']) 160 process.name = event['args']['name'] 161 elif event['name'] == 'process_labels': 162 process = self._GetOrCreateProcess(event['pid']) 163 process.labels = event['args']['labels'] 164 elif event['name'] == 'trace_buffer_overflowed': 165 process = self._GetOrCreateProcess(event['pid']) 166 process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts']) 167 else: 168 self._model.import_errors.append( 169 'Unrecognized metadata name: ' + event['name']) 170 171 def _ProcessInstantEvent(self, event): 172 # Treat an Instant event as a duration 0 slice. 173 # SliceTrack's redraw() knows how to handle this. 174 thread = (self._GetOrCreateProcess(event['pid']) 175 .GetOrCreateThread(event['tid'])) 176 thread.BeginSlice(event['cat'], 177 event['name'], 178 event['ts'] / 1000.0, 179 args=event.get('args')) 180 thread.EndSlice(event['ts'] / 1000.0) 181 182 def _ProcessSampleEvent(self, event): 183 thread = (self._GetOrCreateProcess(event['pid']) 184 .GetOrCreateThread(event['tid'])) 185 thread.AddSample(event['cat'], 186 event['name'], 187 event['ts'] / 1000.0, 188 event.get('args')) 189 190 def _ProcessFlowEvent(self, event): 191 thread = (self._GetOrCreateProcess(event['pid']) 192 .GetOrCreateThread(event['tid'])) 193 self._all_flow_events.append({ 194 'event': event, 195 'thread': thread}) 196 197 def _ProcessMemoryDumpEvents(self, events): 198 # Dictionary to order dumps by id and process. 199 global_dumps = {} 200 for event in events: 201 global_dump = global_dumps.setdefault(event['id'], {}) 202 dump_events = global_dump.setdefault(event['pid'], []) 203 dump_events.append(event) 204 for dump_id, global_dump in global_dumps.iteritems(): 205 for pid, dump_events in global_dump.iteritems(): 206 process = self._GetOrCreateProcess(pid) 207 memory_dump = memory_dump_event.ProcessMemoryDumpEvent(process, 208 dump_events) 209 process.AddMemoryDumpEvent(memory_dump) 210 self._all_memory_dumps_by_dump_id[dump_id].append(memory_dump) 211 212 def ImportEvents(self): 213 """Walks through the events_ list and outputs the structures discovered to 214 model_. 215 """ 216 memory_dump_events = [] 217 for event in self._events: 218 phase = event.get('ph', None) 219 if phase == 'B' or phase == 'E': 220 self._ProcessDurationEvent(event) 221 elif phase == 'X': 222 self._ProcessCompleteEvent(event) 223 # Note, S, F, T are deprecated and replaced by 'b' and 'e'. For 224 # backwards compatibility continue to support them here. 225 elif phase == 'S' or phase == 'F' or phase == 'T': 226 self._ProcessAsyncEvent(event) 227 elif phase == 'b' or phase == 'e': 228 self._ProcessAsyncEvent(event) 229 # Note, I is historic. The instant event marker got changed, but we 230 # want to support loading old trace files so we have both I and i. 231 elif phase == 'I' or phase == 'i': 232 self._ProcessInstantEvent(event) 233 elif phase == 'P': 234 self._ProcessSampleEvent(event) 235 elif phase == 'C': 236 self._ProcessCounterEvent(event) 237 elif phase == 'M': 238 self._ProcessMetadataEvent(event) 239 elif phase == 'N' or phase == 'D' or phase == 'O': 240 self._ProcessObjectEvent(event) 241 elif phase == 's' or phase == 't' or phase == 'f': 242 self._ProcessFlowEvent(event) 243 elif phase == 'v': 244 memory_dump_events.append(event) 245 elif phase == 'R': 246 self._ProcessMarkEvent(event) 247 else: 248 self._model.import_errors.append('Unrecognized event phase: ' + 249 phase + '(' + event['name'] + ')') 250 251 # Memory dumps of a process with the same dump id need to be merged before 252 # processing. So, memory dump events are processed all at once. 253 self._ProcessMemoryDumpEvents(memory_dump_events) 254 return self._model 255 256 def FinalizeImport(self): 257 """Called by the Model after all other importers have imported their 258 events.""" 259 self._model.UpdateBounds() 260 261 # We need to reupdate the bounds in case the minimum start time changes 262 self._model.UpdateBounds() 263 self._CreateAsyncSlices() 264 self._CreateFlowSlices() 265 self._SetBrowserProcess() 266 self._SetGpuProcess() 267 self._CreateExplicitObjects() 268 self._CreateImplicitObjects() 269 self._CreateMemoryDumps() 270 271 def _CreateAsyncSlices(self): 272 if len(self._all_async_events) == 0: 273 return 274 275 self._all_async_events.sort(key=lambda x: x['event']['ts']) 276 277 async_event_states_by_name_then_id = {} 278 279 all_async_events = self._all_async_events 280 for async_event_state in all_async_events: 281 event = async_event_state['event'] 282 name = event.get('name', None) 283 if name is None: 284 self._model.import_errors.append( 285 'Async events (ph: b, e, S, T or F) require an name parameter.') 286 continue 287 288 event_id = event.get('id') 289 if event_id is None: 290 self._model.import_errors.append( 291 'Async events (ph: b, e, S, T or F) require an id parameter.') 292 continue 293 294 # TODO(simonjam): Add a synchronous tick on the appropriate thread. 295 296 if event['ph'] == 'S' or event['ph'] == 'b': 297 if not name in async_event_states_by_name_then_id: 298 async_event_states_by_name_then_id[name] = {} 299 if event_id in async_event_states_by_name_then_id[name]: 300 self._model.import_errors.append( 301 'At %d, a slice of the same id %s was already open.' % ( 302 event['ts'], event_id)) 303 continue 304 305 async_event_states_by_name_then_id[name][event_id] = [] 306 async_event_states_by_name_then_id[name][event_id].append( 307 async_event_state) 308 else: 309 if name not in async_event_states_by_name_then_id: 310 self._model.import_errors.append( 311 'At %d, no slice named %s was open.' % (event['ts'], name,)) 312 continue 313 if event_id not in async_event_states_by_name_then_id[name]: 314 self._model.import_errors.append( 315 'At %d, no slice named %s with id=%s was open.' % ( 316 event['ts'], name, event_id)) 317 continue 318 events = async_event_states_by_name_then_id[name][event_id] 319 events.append(async_event_state) 320 321 if event['ph'] == 'F' or event['ph'] == 'e': 322 # Create a slice from start to end. 323 async_slice = tracing_async_slice.AsyncSlice( 324 events[0]['event']['cat'], 325 name, 326 events[0]['event']['ts'] / 1000.0) 327 328 async_slice.duration = ((event['ts'] / 1000.0) 329 - (events[0]['event']['ts'] / 1000.0)) 330 331 async_slice.start_thread = events[0]['thread'] 332 async_slice.end_thread = async_event_state['thread'] 333 if async_slice.start_thread == async_slice.end_thread: 334 if 'tts' in event and 'tts' in events[0]['event']: 335 async_slice.thread_start = events[0]['event']['tts'] / 1000.0 336 async_slice.thread_duration = ((event['tts'] / 1000.0) 337 - (events[0]['event']['tts'] / 1000.0)) 338 async_slice.id = event_id 339 async_slice.args = events[0]['event']['args'] 340 341 # Create sub_slices for each step. 342 for j in xrange(1, len(events)): 343 sub_name = name 344 if events[j - 1]['event']['ph'] == 'T': 345 sub_name = name + ':' + events[j - 1]['event']['args']['step'] 346 sub_slice = tracing_async_slice.AsyncSlice( 347 events[0]['event']['cat'], 348 sub_name, 349 events[j - 1]['event']['ts'] / 1000.0) 350 sub_slice.parent_slice = async_slice 351 352 sub_slice.duration = ((events[j]['event']['ts'] / 1000.0) 353 - (events[j - 1]['event']['ts'] / 1000.0)) 354 355 sub_slice.start_thread = events[j - 1]['thread'] 356 sub_slice.end_thread = events[j]['thread'] 357 if sub_slice.start_thread == sub_slice.end_thread: 358 if 'tts' in events[j]['event'] and \ 359 'tts' in events[j - 1]['event']: 360 sub_slice.thread_duration = \ 361 ((events[j]['event']['tts'] / 1000.0) 362 - (events[j - 1]['event']['tts'] / 1000.0)) 363 364 sub_slice.id = event_id 365 sub_slice.args = events[j - 1]['event']['args'] 366 367 async_slice.AddSubSlice(sub_slice) 368 369 # The args for the finish event go in the last sub_slice. 370 last_slice = async_slice.sub_slices[-1] 371 for arg_name, arg_value in event['args'].iteritems(): 372 last_slice.args[arg_name] = arg_value 373 374 # Add |async_slice| to the start-thread's async_slices. 375 async_slice.start_thread.AddAsyncSlice(async_slice) 376 del async_event_states_by_name_then_id[name][event_id] 377 378 def _CreateExplicitObjects(self): 379 # TODO(tengs): Implement object instance parsing 380 pass 381 382 def _CreateImplicitObjects(self): 383 # TODO(tengs): Implement object instance parsing 384 pass 385 386 def _CreateFlowSlices(self): 387 if len(self._all_flow_events) == 0: 388 return 389 390 self._all_flow_events.sort(key=lambda x: x['event']['ts']) 391 392 flow_id_to_event = {} 393 for data in self._all_flow_events: 394 event = data['event'] 395 thread = data['thread'] 396 if 'name' not in event: 397 self._model.import_errors.append( 398 'Flow events (ph: s, t or f) require a name parameter.') 399 continue 400 if 'id' not in event: 401 self._model.import_errors.append( 402 'Flow events (ph: s, t or f) require an id parameter.') 403 continue 404 405 flow_event = tracing_flow_event.FlowEvent( 406 event['cat'], 407 event['id'], 408 event['name'], 409 event['ts'] / 1000.0, 410 event['args']) 411 thread.AddFlowEvent(flow_event) 412 413 if event['ph'] == 's': 414 if event['id'] in flow_id_to_event: 415 self._model.import_errors.append( 416 'event id %s already seen when encountering start of' 417 'flow event.' % event['id']) 418 continue 419 flow_id_to_event[event['id']] = flow_event 420 elif event['ph'] == 't' or event['ph'] == 'f': 421 if not event['id'] in flow_id_to_event: 422 self._model.import_errors.append( 423 'Found flow phase %s for id: %s but no flow start found.' % ( 424 event['ph'], event['id'])) 425 continue 426 flow_position = flow_id_to_event[event['id']] 427 self._model.flow_events.append([flow_position, flow_event]) 428 429 if event['ph'] == 'f': 430 del flow_id_to_event[event['id']] 431 else: 432 # Make this event the next start event in this flow. 433 flow_id_to_event[event['id']] = flow_event 434 435 def _CreateMemoryDumps(self): 436 self._model.SetGlobalMemoryDumps( 437 memory_dump_event.GlobalMemoryDump(events) 438 for events in self._all_memory_dumps_by_dump_id.itervalues()) 439 440 def _SetBrowserProcess(self): 441 for thread in self._model.GetAllThreads(): 442 if thread.name == 'CrBrowserMain': 443 self._model.browser_process = thread.parent 444 445 def _SetGpuProcess(self): 446 for thread in self._model.GetAllThreads(): 447 if thread.name == 'CrGpuMain': 448 self._model.gpu_process = thread.parent 449