1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""TraceEventImporter imports TraceEvent-formatted data
5into the provided model.
6This is a port of the trace event importer from
7https://code.google.com/p/trace-viewer/
8"""
9
10import collections
11import copy
12
13import telemetry.timeline.async_slice as tracing_async_slice
14import telemetry.timeline.flow_event as tracing_flow_event
15from telemetry.timeline import importer
16from telemetry.timeline import memory_dump_event
17from tracing.trace_data import trace_data as trace_data_module
18
19
20class TraceEventTimelineImporter(importer.TimelineImporter):
21  def __init__(self, model, trace_data):
22    super(TraceEventTimelineImporter, self).__init__(
23        model, trace_data, import_order=1)
24    assert isinstance(trace_data, trace_data_module.TraceData)
25    self._trace_data = trace_data
26
27    self._all_async_events = []
28    self._all_object_events = []
29    self._all_flow_events = []
30    self._all_memory_dumps_by_dump_id = collections.defaultdict(list)
31
32    self._events = []
33    self._metadata = []
34    for trace in trace_data.GetTracesFor(trace_data_module.CHROME_TRACE_PART):
35      self._events.extend(trace['traceEvents'])
36      self.CollectMetadataRecords(trace)
37
38  def CollectMetadataRecords(self, trace):
39    part_field_names = {p.raw_field_name for p in
40                        trace_data_module.ALL_TRACE_PARTS}
41    for k, v in trace.iteritems():
42      if k in part_field_names:
43        continue
44      self._metadata.append({'name': k, 'value': v})
45
46
47  @staticmethod
48  def GetSupportedPart():
49    return trace_data_module.CHROME_TRACE_PART
50
51  def _GetOrCreateProcess(self, pid):
52    return self._model.GetOrCreateProcess(pid)
53
54  def _DeepCopyIfNeeded(self, obj):
55    if self._trace_data.events_are_safely_mutable:
56      return obj
57    return copy.deepcopy(obj)
58
59  def _ProcessAsyncEvent(self, event):
60    """Helper to process an 'async finish' event, which will close an
61    open slice.
62    """
63    thread = (self._GetOrCreateProcess(event['pid'])
64        .GetOrCreateThread(event['tid']))
65    self._all_async_events.append({
66        'event': event,
67        'thread': thread})
68
69  def _ProcessCounterEvent(self, event):
70    """Helper that creates and adds samples to a Counter object based on
71    'C' phase events.
72    """
73    if 'id' in event:
74      ctr_name = event['name'] + '[' + str(event['id']) + ']'
75    else:
76      ctr_name = event['name']
77
78    ctr = (self._GetOrCreateProcess(event['pid'])
79        .GetOrCreateCounter(event['cat'], ctr_name))
80    # Initialize the counter's series fields if needed.
81    if len(ctr.series_names) == 0:
82      #TODO: implement counter object
83      for series_name in event['args']:
84        ctr.series_names.append(series_name)
85      if len(ctr.series_names) == 0:
86        self._model.import_errors.append('Expected counter ' + event['name'] +
87            ' to have at least one argument to use as a value.')
88        # Drop the counter.
89        del ctr.parent.counters[ctr.full_name]
90        return
91
92    # Add the sample values.
93    ctr.timestamps.append(event['ts'] / 1000.0)
94    for series_name in ctr.series_names:
95      if series_name not in event['args']:
96        ctr.samples.append(0)
97        continue
98      ctr.samples.append(event['args'][series_name])
99
100  def _ProcessObjectEvent(self, event):
101    thread = (self._GetOrCreateProcess(event['pid'])
102      .GetOrCreateThread(event['tid']))
103    self._all_object_events.append({
104        'event': event,
105        'thread': thread})
106
107  def _ProcessDurationEvent(self, event):
108    thread = (self._GetOrCreateProcess(event['pid'])
109      .GetOrCreateThread(event['tid']))
110    if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
111      self._model.import_errors.append(
112          'Timestamps are moving backward.')
113      return
114
115    if event['ph'] == 'B':
116      thread.BeginSlice(event['cat'],
117                        event['name'],
118                        event['ts'] / 1000.0,
119                        event['tts'] / 1000.0 if 'tts' in event else None,
120                        event['args'])
121    elif event['ph'] == 'E':
122      thread = (self._GetOrCreateProcess(event['pid'])
123        .GetOrCreateThread(event['tid']))
124      if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
125        self._model.import_errors.append(
126            'Timestamps are moving backward.')
127        return
128      if not thread.open_slice_count:
129        self._model.import_errors.append(
130            'E phase event without a matching B phase event.')
131        return
132
133      new_slice = thread.EndSlice(
134          event['ts'] / 1000.0,
135          event['tts'] / 1000.0 if 'tts' in event else None)
136      for arg_name, arg_value in event.get('args', {}).iteritems():
137        if arg_name in new_slice.args:
138          self._model.import_errors.append(
139              'Both the B and E phases of ' + new_slice.name +
140              ' provided values for argument ' + arg_name + '. ' +
141              'The value of the E phase event will be used.')
142        new_slice.args[arg_name] = arg_value
143
144  def _ProcessCompleteEvent(self, event):
145    thread = (self._GetOrCreateProcess(event['pid'])
146        .GetOrCreateThread(event['tid']))
147    thread.PushCompleteSlice(
148        event['cat'],
149        event['name'],
150        event['ts'] / 1000.0,
151        event['dur'] / 1000.0 if 'dur' in event else None,
152        event['tts'] / 1000.0 if 'tts' in event else None,
153        event['tdur'] / 1000.0 if 'tdur' in event else None,
154        event['args'])
155
156  def _ProcessMarkEvent(self, event):
157    thread = (self._GetOrCreateProcess(event['pid'])
158        .GetOrCreateThread(event['tid']))
159    thread.PushMarkSlice(
160        event['cat'],
161        event['name'],
162        event['ts'] / 1000.0,
163        event['tts'] / 1000.0 if 'tts' in event else None,
164        event['args'] if 'args' in event else None)
165
166  def _ProcessMetadataEvent(self, event):
167    if event['name'] == 'thread_name':
168      thread = (self._GetOrCreateProcess(event['pid'])
169          .GetOrCreateThread(event['tid']))
170      thread.name = event['args']['name']
171    elif event['name'] == 'process_name':
172      process = self._GetOrCreateProcess(event['pid'])
173      process.name = event['args']['name']
174    elif event['name'] == 'process_labels':
175      process = self._GetOrCreateProcess(event['pid'])
176      process.labels = event['args']['labels']
177    elif event['name'] == 'process_uptime_seconds':
178      process = self._GetOrCreateProcess(event['pid'])
179      process.uptime_seconds = event['args']['uptime']
180    elif event['name'] == 'trace_buffer_overflowed':
181      process = self._GetOrCreateProcess(event['pid'])
182      process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts'])
183    else:
184      self._model.import_errors.append(
185          'Unrecognized metadata name: ' + event['name'])
186
187  def _ProcessInstantEvent(self, event):
188    # Treat an Instant event as a duration 0 slice.
189    # SliceTrack's redraw() knows how to handle this.
190    thread = (self._GetOrCreateProcess(event['pid'])
191      .GetOrCreateThread(event['tid']))
192    thread.BeginSlice(event['cat'],
193                      event['name'],
194                      event['ts'] / 1000.0,
195                      args=event.get('args'))
196    thread.EndSlice(event['ts'] / 1000.0)
197
198  def _ProcessSampleEvent(self, event):
199    thread = (self._GetOrCreateProcess(event['pid'])
200        .GetOrCreateThread(event['tid']))
201    thread.AddSample(event['cat'],
202                     event['name'],
203                     event['ts'] / 1000.0,
204                     event.get('args'))
205
206  def _ProcessFlowEvent(self, event):
207    thread = (self._GetOrCreateProcess(event['pid'])
208        .GetOrCreateThread(event['tid']))
209    self._all_flow_events.append({
210        'event': event,
211        'thread': thread})
212
213  def _ProcessMemoryDumpEvents(self, events):
214    # Dictionary to order dumps by id and process.
215    global_dumps = {}
216    for event in events:
217      global_dump = global_dumps.setdefault(event['id'], {})
218      dump_events = global_dump.setdefault(event['pid'], [])
219      dump_events.append(event)
220    for dump_id, global_dump in global_dumps.iteritems():
221      for pid, dump_events in global_dump.iteritems():
222        process = self._GetOrCreateProcess(pid)
223        memory_dump = memory_dump_event.ProcessMemoryDumpEvent(process,
224                                                               dump_events)
225        process.AddMemoryDumpEvent(memory_dump)
226        self._all_memory_dumps_by_dump_id[dump_id].append(memory_dump)
227
228  def ImportEvents(self):
229    """Walks through the events_ list and outputs the structures discovered to
230    model_.
231    """
232    for r in self._metadata:
233      self._model.metadata.append(r)
234    memory_dump_events = []
235    for event in self._events:
236      phase = event.get('ph', None)
237      if phase == 'B' or phase == 'E':
238        self._ProcessDurationEvent(event)
239      elif phase == 'X':
240        self._ProcessCompleteEvent(event)
241      # Note, S, F, T are deprecated and replaced by 'b' and 'e'. For
242      # backwards compatibility continue to support them here.
243      elif phase == 'S' or phase == 'F' or phase == 'T':
244        self._ProcessAsyncEvent(event)
245      elif phase == 'b' or phase == 'e':
246        self._ProcessAsyncEvent(event)
247      # Note, I is historic. The instant event marker got changed, but we
248      # want to support loading old trace files so we have both I and i.
249      elif phase == 'I' or phase == 'i':
250        self._ProcessInstantEvent(event)
251      elif phase == 'P':
252        self._ProcessSampleEvent(event)
253      elif phase == 'C':
254        self._ProcessCounterEvent(event)
255      elif phase == 'M':
256        self._ProcessMetadataEvent(event)
257      elif phase == 'N' or phase == 'D' or phase == 'O':
258        self._ProcessObjectEvent(event)
259      elif phase == 's' or phase == 't' or phase == 'f':
260        self._ProcessFlowEvent(event)
261      elif phase == 'v':
262        memory_dump_events.append(event)
263      elif phase == 'R':
264        self._ProcessMarkEvent(event)
265      else:
266        self._model.import_errors.append('Unrecognized event phase: ' +
267            phase + '(' + event['name'] + ')')
268
269    # Memory dumps of a process with the same dump id need to be merged before
270    # processing. So, memory dump events are processed all at once.
271    self._ProcessMemoryDumpEvents(memory_dump_events)
272    return self._model
273
274  def FinalizeImport(self):
275    """Called by the Model after all other importers have imported their
276    events."""
277    self._model.UpdateBounds()
278
279    # We need to reupdate the bounds in case the minimum start time changes
280    self._model.UpdateBounds()
281    self._CreateAsyncSlices()
282    self._CreateFlowSlices()
283    self._SetBrowserProcess()
284    self._SetGpuProcess()
285    self._CreateExplicitObjects()
286    self._CreateImplicitObjects()
287    self._CreateMemoryDumps()
288
289  def _CreateAsyncSlices(self):
290    if len(self._all_async_events) == 0:
291      return
292
293    self._all_async_events.sort(key=lambda x: x['event']['ts'])
294
295    async_event_states_by_name_then_id = {}
296
297    all_async_events = self._all_async_events
298    for async_event_state in all_async_events:
299      event = async_event_state['event']
300      name = event.get('name', None)
301      if name is None:
302        self._model.import_errors.append(
303            'Async events (ph: b, e, S, T or F) require an name parameter.')
304        continue
305
306      event_id = event.get('id')
307      if event_id is None:
308        self._model.import_errors.append(
309            'Async events (ph: b, e, S, T or F) require an id parameter.')
310        continue
311
312      # TODO(simonjam): Add a synchronous tick on the appropriate thread.
313
314      if event['ph'] == 'S' or event['ph'] == 'b':
315        if not name in async_event_states_by_name_then_id:
316          async_event_states_by_name_then_id[name] = {}
317        if event_id in async_event_states_by_name_then_id[name]:
318          self._model.import_errors.append(
319              'At %d, a slice of the same id %s was already open.' % (
320                  event['ts'], event_id))
321          continue
322
323        async_event_states_by_name_then_id[name][event_id] = []
324        async_event_states_by_name_then_id[name][event_id].append(
325            async_event_state)
326      else:
327        if name not in async_event_states_by_name_then_id:
328          self._model.import_errors.append(
329              'At %d, no slice named %s was open.' % (event['ts'], name,))
330          continue
331        if event_id not in async_event_states_by_name_then_id[name]:
332          self._model.import_errors.append(
333              'At %d, no slice named %s with id=%s was open.' % (
334                  event['ts'], name, event_id))
335          continue
336        events = async_event_states_by_name_then_id[name][event_id]
337        events.append(async_event_state)
338
339        if event['ph'] == 'F' or event['ph'] == 'e':
340          # Create a slice from start to end.
341          async_slice = tracing_async_slice.AsyncSlice(
342              events[0]['event']['cat'],
343              name,
344              events[0]['event']['ts'] / 1000.0)
345
346          async_slice.duration = ((event['ts'] / 1000.0)
347              - (events[0]['event']['ts'] / 1000.0))
348
349          async_slice.start_thread = events[0]['thread']
350          async_slice.end_thread = async_event_state['thread']
351          if async_slice.start_thread == async_slice.end_thread:
352            if 'tts' in event and 'tts' in events[0]['event']:
353              async_slice.thread_start = events[0]['event']['tts'] / 1000.0
354              async_slice.thread_duration = ((event['tts'] / 1000.0)
355                  - (events[0]['event']['tts'] / 1000.0))
356          async_slice.id = event_id
357          async_slice.args = events[0]['event']['args']
358
359          # Create sub_slices for each step.
360          for j in xrange(1, len(events)):
361            sub_name = name
362            if events[j - 1]['event']['ph'] == 'T':
363              sub_name = name + ':' + events[j - 1]['event']['args']['step']
364            sub_slice = tracing_async_slice.AsyncSlice(
365                events[0]['event']['cat'],
366                sub_name,
367                events[j - 1]['event']['ts'] / 1000.0)
368            sub_slice.parent_slice = async_slice
369
370            sub_slice.duration = ((events[j]['event']['ts'] / 1000.0)
371                - (events[j - 1]['event']['ts'] / 1000.0))
372
373            sub_slice.start_thread = events[j - 1]['thread']
374            sub_slice.end_thread = events[j]['thread']
375            if sub_slice.start_thread == sub_slice.end_thread:
376              if 'tts' in events[j]['event'] and \
377                  'tts' in events[j - 1]['event']:
378                sub_slice.thread_duration = \
379                    ((events[j]['event']['tts'] / 1000.0)
380                        - (events[j - 1]['event']['tts'] / 1000.0))
381
382            sub_slice.id = event_id
383            sub_slice.args = events[j - 1]['event']['args']
384
385            async_slice.AddSubSlice(sub_slice)
386
387          # The args for the finish event go in the last sub_slice.
388          last_slice = async_slice.sub_slices[-1]
389          for arg_name, arg_value in event['args'].iteritems():
390            last_slice.args[arg_name] = arg_value
391
392          # Add |async_slice| to the start-thread's async_slices.
393          async_slice.start_thread.AddAsyncSlice(async_slice)
394          del async_event_states_by_name_then_id[name][event_id]
395
396  def _CreateExplicitObjects(self):
397    # TODO(tengs): Implement object instance parsing
398    pass
399
400  def _CreateImplicitObjects(self):
401    # TODO(tengs): Implement object instance parsing
402    pass
403
404  def _CreateFlowSlices(self):
405    if len(self._all_flow_events) == 0:
406      return
407
408    self._all_flow_events.sort(key=lambda x: x['event']['ts'])
409
410    flow_id_to_event = {}
411    for data in self._all_flow_events:
412      event = data['event']
413      thread = data['thread']
414      if 'name' not in event:
415        self._model.import_errors.append(
416          'Flow events (ph: s, t or f) require a name parameter.')
417        continue
418      if 'id' not in event:
419        self._model.import_errors.append(
420          'Flow events (ph: s, t or f) require an id parameter.')
421        continue
422
423      flow_event = tracing_flow_event.FlowEvent(
424          event['cat'],
425          event['id'],
426          event['name'],
427          event['ts'] / 1000.0,
428          event['args'])
429      thread.AddFlowEvent(flow_event)
430
431      if event['ph'] == 's':
432        if event['id'] in flow_id_to_event:
433          self._model.import_errors.append(
434              'event id %s already seen when encountering start of'
435              'flow event.' % event['id'])
436          continue
437        flow_id_to_event[event['id']] = flow_event
438      elif event['ph'] == 't' or event['ph'] == 'f':
439        if not event['id'] in flow_id_to_event:
440          self._model.import_errors.append(
441            'Found flow phase %s for id: %s but no flow start found.' % (
442                event['ph'], event['id']))
443          continue
444        flow_position = flow_id_to_event[event['id']]
445        self._model.flow_events.append([flow_position, flow_event])
446
447        if event['ph'] == 'f':
448          del flow_id_to_event[event['id']]
449        else:
450          # Make this event the next start event in this flow.
451          flow_id_to_event[event['id']] = flow_event
452
453  def _CreateMemoryDumps(self):
454    self._model.SetGlobalMemoryDumps(
455        memory_dump_event.GlobalMemoryDump(events)
456        for events in self._all_memory_dumps_by_dump_id.itervalues())
457
458  def _SetBrowserProcess(self):
459    for thread in self._model.GetAllThreads():
460      if thread.name == 'CrBrowserMain':
461        self._model.browser_process = thread.parent
462
463  def _SetGpuProcess(self):
464    for thread in self._model.GetAllThreads():
465      if thread.name == 'CrGpuMain':
466        self._model.gpu_process = thread.parent
467