1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""TraceEventImporter imports TraceEvent-formatted data
5into the provided model.
6This is a port of the trace event importer from
7https://code.google.com/p/trace-viewer/
8"""
9
10import collections
11import copy
12
13import telemetry.timeline.async_slice as tracing_async_slice
14import telemetry.timeline.flow_event as tracing_flow_event
15from telemetry.timeline import importer
16from telemetry.timeline import memory_dump_event
17from telemetry.timeline import trace_data as trace_data_module
18
19
20class TraceEventTimelineImporter(importer.TimelineImporter):
21  def __init__(self, model, trace_data):
22    super(TraceEventTimelineImporter, self).__init__(
23        model, trace_data, import_order=1)
24    assert isinstance(trace_data, trace_data_module.TraceData)
25    self._trace_data = trace_data
26
27    self._all_async_events = []
28    self._all_object_events = []
29    self._all_flow_events = []
30    self._all_memory_dumps_by_dump_id = collections.defaultdict(list)
31
32    self._events = trace_data.GetEventsFor(trace_data_module.CHROME_TRACE_PART)
33
34  @staticmethod
35  def GetSupportedPart():
36    return trace_data_module.CHROME_TRACE_PART
37
38  def _GetOrCreateProcess(self, pid):
39    return self._model.GetOrCreateProcess(pid)
40
41  def _DeepCopyIfNeeded(self, obj):
42    if self._trace_data.events_are_safely_mutable:
43      return obj
44    return copy.deepcopy(obj)
45
46  def _ProcessAsyncEvent(self, event):
47    """Helper to process an 'async finish' event, which will close an
48    open slice.
49    """
50    thread = (self._GetOrCreateProcess(event['pid'])
51        .GetOrCreateThread(event['tid']))
52    self._all_async_events.append({
53        'event': event,
54        'thread': thread})
55
56  def _ProcessCounterEvent(self, event):
57    """Helper that creates and adds samples to a Counter object based on
58    'C' phase events.
59    """
60    if 'id' in event:
61      ctr_name = event['name'] + '[' + str(event['id']) + ']'
62    else:
63      ctr_name = event['name']
64
65    ctr = (self._GetOrCreateProcess(event['pid'])
66        .GetOrCreateCounter(event['cat'], ctr_name))
67    # Initialize the counter's series fields if needed.
68    if len(ctr.series_names) == 0:
69      #TODO: implement counter object
70      for series_name in event['args']:
71        ctr.series_names.append(series_name)
72      if len(ctr.series_names) == 0:
73        self._model.import_errors.append('Expected counter ' + event['name'] +
74            ' to have at least one argument to use as a value.')
75        # Drop the counter.
76        del ctr.parent.counters[ctr.full_name]
77        return
78
79    # Add the sample values.
80    ctr.timestamps.append(event['ts'] / 1000.0)
81    for series_name in ctr.series_names:
82      if series_name not in event['args']:
83        ctr.samples.append(0)
84        continue
85      ctr.samples.append(event['args'][series_name])
86
87  def _ProcessObjectEvent(self, event):
88    thread = (self._GetOrCreateProcess(event['pid'])
89      .GetOrCreateThread(event['tid']))
90    self._all_object_events.append({
91        'event': event,
92        'thread': thread})
93
94  def _ProcessDurationEvent(self, event):
95    thread = (self._GetOrCreateProcess(event['pid'])
96      .GetOrCreateThread(event['tid']))
97    if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
98      self._model.import_errors.append(
99          'Timestamps are moving backward.')
100      return
101
102    if event['ph'] == 'B':
103      thread.BeginSlice(event['cat'],
104                        event['name'],
105                        event['ts'] / 1000.0,
106                        event['tts'] / 1000.0 if 'tts' in event else None,
107                        event['args'])
108    elif event['ph'] == 'E':
109      thread = (self._GetOrCreateProcess(event['pid'])
110        .GetOrCreateThread(event['tid']))
111      if not thread.IsTimestampValidForBeginOrEnd(event['ts'] / 1000.0):
112        self._model.import_errors.append(
113            'Timestamps are moving backward.')
114        return
115      if not thread.open_slice_count:
116        self._model.import_errors.append(
117            'E phase event without a matching B phase event.')
118        return
119
120      new_slice = thread.EndSlice(
121          event['ts'] / 1000.0,
122          event['tts'] / 1000.0 if 'tts' in event else None)
123      for arg_name, arg_value in event.get('args', {}).iteritems():
124        if arg_name in new_slice.args:
125          self._model.import_errors.append(
126              'Both the B and E phases of ' + new_slice.name +
127              ' provided values for argument ' + arg_name + '. ' +
128              'The value of the E phase event will be used.')
129        new_slice.args[arg_name] = arg_value
130
131  def _ProcessCompleteEvent(self, event):
132    thread = (self._GetOrCreateProcess(event['pid'])
133        .GetOrCreateThread(event['tid']))
134    thread.PushCompleteSlice(
135        event['cat'],
136        event['name'],
137        event['ts'] / 1000.0,
138        event['dur'] / 1000.0 if 'dur' in event else None,
139        event['tts'] / 1000.0 if 'tts' in event else None,
140        event['tdur'] / 1000.0 if 'tdur' in event else None,
141        event['args'])
142
143  def _ProcessMarkEvent(self, event):
144    thread = (self._GetOrCreateProcess(event['pid'])
145        .GetOrCreateThread(event['tid']))
146    thread.PushMarkSlice(
147        event['cat'],
148        event['name'],
149        event['ts'] / 1000.0,
150        event['tts'] / 1000.0 if 'tts' in event else None,
151        event['args'] if 'args' in event else None)
152
153  def _ProcessMetadataEvent(self, event):
154    if event['name'] == 'thread_name':
155      thread = (self._GetOrCreateProcess(event['pid'])
156          .GetOrCreateThread(event['tid']))
157      thread.name = event['args']['name']
158    elif event['name'] == 'process_name':
159      process = self._GetOrCreateProcess(event['pid'])
160      process.name = event['args']['name']
161    elif event['name'] == 'process_labels':
162      process = self._GetOrCreateProcess(event['pid'])
163      process.labels = event['args']['labels']
164    elif event['name'] == 'trace_buffer_overflowed':
165      process = self._GetOrCreateProcess(event['pid'])
166      process.SetTraceBufferOverflowTimestamp(event['args']['overflowed_at_ts'])
167    else:
168      self._model.import_errors.append(
169          'Unrecognized metadata name: ' + event['name'])
170
171  def _ProcessInstantEvent(self, event):
172    # Treat an Instant event as a duration 0 slice.
173    # SliceTrack's redraw() knows how to handle this.
174    thread = (self._GetOrCreateProcess(event['pid'])
175      .GetOrCreateThread(event['tid']))
176    thread.BeginSlice(event['cat'],
177                      event['name'],
178                      event['ts'] / 1000.0,
179                      args=event.get('args'))
180    thread.EndSlice(event['ts'] / 1000.0)
181
182  def _ProcessSampleEvent(self, event):
183    thread = (self._GetOrCreateProcess(event['pid'])
184        .GetOrCreateThread(event['tid']))
185    thread.AddSample(event['cat'],
186                     event['name'],
187                     event['ts'] / 1000.0,
188                     event.get('args'))
189
190  def _ProcessFlowEvent(self, event):
191    thread = (self._GetOrCreateProcess(event['pid'])
192        .GetOrCreateThread(event['tid']))
193    self._all_flow_events.append({
194        'event': event,
195        'thread': thread})
196
197  def _ProcessMemoryDumpEvents(self, events):
198    # Dictionary to order dumps by id and process.
199    global_dumps = {}
200    for event in events:
201      global_dump = global_dumps.setdefault(event['id'], {})
202      dump_events = global_dump.setdefault(event['pid'], [])
203      dump_events.append(event)
204    for dump_id, global_dump in global_dumps.iteritems():
205      for pid, dump_events in global_dump.iteritems():
206        process = self._GetOrCreateProcess(pid)
207        memory_dump = memory_dump_event.ProcessMemoryDumpEvent(process,
208                                                               dump_events)
209        process.AddMemoryDumpEvent(memory_dump)
210        self._all_memory_dumps_by_dump_id[dump_id].append(memory_dump)
211
212  def ImportEvents(self):
213    """Walks through the events_ list and outputs the structures discovered to
214    model_.
215    """
216    memory_dump_events = []
217    for event in self._events:
218      phase = event.get('ph', None)
219      if phase == 'B' or phase == 'E':
220        self._ProcessDurationEvent(event)
221      elif phase == 'X':
222        self._ProcessCompleteEvent(event)
223      # Note, S, F, T are deprecated and replaced by 'b' and 'e'. For
224      # backwards compatibility continue to support them here.
225      elif phase == 'S' or phase == 'F' or phase == 'T':
226        self._ProcessAsyncEvent(event)
227      elif phase == 'b' or phase == 'e':
228        self._ProcessAsyncEvent(event)
229      # Note, I is historic. The instant event marker got changed, but we
230      # want to support loading old trace files so we have both I and i.
231      elif phase == 'I' or phase == 'i':
232        self._ProcessInstantEvent(event)
233      elif phase == 'P':
234        self._ProcessSampleEvent(event)
235      elif phase == 'C':
236        self._ProcessCounterEvent(event)
237      elif phase == 'M':
238        self._ProcessMetadataEvent(event)
239      elif phase == 'N' or phase == 'D' or phase == 'O':
240        self._ProcessObjectEvent(event)
241      elif phase == 's' or phase == 't' or phase == 'f':
242        self._ProcessFlowEvent(event)
243      elif phase == 'v':
244        memory_dump_events.append(event)
245      elif phase == 'R':
246        self._ProcessMarkEvent(event)
247      else:
248        self._model.import_errors.append('Unrecognized event phase: ' +
249            phase + '(' + event['name'] + ')')
250
251    # Memory dumps of a process with the same dump id need to be merged before
252    # processing. So, memory dump events are processed all at once.
253    self._ProcessMemoryDumpEvents(memory_dump_events)
254    return self._model
255
256  def FinalizeImport(self):
257    """Called by the Model after all other importers have imported their
258    events."""
259    self._model.UpdateBounds()
260
261    # We need to reupdate the bounds in case the minimum start time changes
262    self._model.UpdateBounds()
263    self._CreateAsyncSlices()
264    self._CreateFlowSlices()
265    self._SetBrowserProcess()
266    self._SetGpuProcess()
267    self._CreateExplicitObjects()
268    self._CreateImplicitObjects()
269    self._CreateMemoryDumps()
270
271  def _CreateAsyncSlices(self):
272    if len(self._all_async_events) == 0:
273      return
274
275    self._all_async_events.sort(key=lambda x: x['event']['ts'])
276
277    async_event_states_by_name_then_id = {}
278
279    all_async_events = self._all_async_events
280    for async_event_state in all_async_events:
281      event = async_event_state['event']
282      name = event.get('name', None)
283      if name is None:
284        self._model.import_errors.append(
285            'Async events (ph: b, e, S, T or F) require an name parameter.')
286        continue
287
288      event_id = event.get('id')
289      if event_id is None:
290        self._model.import_errors.append(
291            'Async events (ph: b, e, S, T or F) require an id parameter.')
292        continue
293
294      # TODO(simonjam): Add a synchronous tick on the appropriate thread.
295
296      if event['ph'] == 'S' or event['ph'] == 'b':
297        if not name in async_event_states_by_name_then_id:
298          async_event_states_by_name_then_id[name] = {}
299        if event_id in async_event_states_by_name_then_id[name]:
300          self._model.import_errors.append(
301              'At %d, a slice of the same id %s was already open.' % (
302                  event['ts'], event_id))
303          continue
304
305        async_event_states_by_name_then_id[name][event_id] = []
306        async_event_states_by_name_then_id[name][event_id].append(
307            async_event_state)
308      else:
309        if name not in async_event_states_by_name_then_id:
310          self._model.import_errors.append(
311              'At %d, no slice named %s was open.' % (event['ts'], name,))
312          continue
313        if event_id not in async_event_states_by_name_then_id[name]:
314          self._model.import_errors.append(
315              'At %d, no slice named %s with id=%s was open.' % (
316                  event['ts'], name, event_id))
317          continue
318        events = async_event_states_by_name_then_id[name][event_id]
319        events.append(async_event_state)
320
321        if event['ph'] == 'F' or event['ph'] == 'e':
322          # Create a slice from start to end.
323          async_slice = tracing_async_slice.AsyncSlice(
324              events[0]['event']['cat'],
325              name,
326              events[0]['event']['ts'] / 1000.0)
327
328          async_slice.duration = ((event['ts'] / 1000.0)
329              - (events[0]['event']['ts'] / 1000.0))
330
331          async_slice.start_thread = events[0]['thread']
332          async_slice.end_thread = async_event_state['thread']
333          if async_slice.start_thread == async_slice.end_thread:
334            if 'tts' in event and 'tts' in events[0]['event']:
335              async_slice.thread_start = events[0]['event']['tts'] / 1000.0
336              async_slice.thread_duration = ((event['tts'] / 1000.0)
337                  - (events[0]['event']['tts'] / 1000.0))
338          async_slice.id = event_id
339          async_slice.args = events[0]['event']['args']
340
341          # Create sub_slices for each step.
342          for j in xrange(1, len(events)):
343            sub_name = name
344            if events[j - 1]['event']['ph'] == 'T':
345              sub_name = name + ':' + events[j - 1]['event']['args']['step']
346            sub_slice = tracing_async_slice.AsyncSlice(
347                events[0]['event']['cat'],
348                sub_name,
349                events[j - 1]['event']['ts'] / 1000.0)
350            sub_slice.parent_slice = async_slice
351
352            sub_slice.duration = ((events[j]['event']['ts'] / 1000.0)
353                - (events[j - 1]['event']['ts'] / 1000.0))
354
355            sub_slice.start_thread = events[j - 1]['thread']
356            sub_slice.end_thread = events[j]['thread']
357            if sub_slice.start_thread == sub_slice.end_thread:
358              if 'tts' in events[j]['event'] and \
359                  'tts' in events[j - 1]['event']:
360                sub_slice.thread_duration = \
361                    ((events[j]['event']['tts'] / 1000.0)
362                        - (events[j - 1]['event']['tts'] / 1000.0))
363
364            sub_slice.id = event_id
365            sub_slice.args = events[j - 1]['event']['args']
366
367            async_slice.AddSubSlice(sub_slice)
368
369          # The args for the finish event go in the last sub_slice.
370          last_slice = async_slice.sub_slices[-1]
371          for arg_name, arg_value in event['args'].iteritems():
372            last_slice.args[arg_name] = arg_value
373
374          # Add |async_slice| to the start-thread's async_slices.
375          async_slice.start_thread.AddAsyncSlice(async_slice)
376          del async_event_states_by_name_then_id[name][event_id]
377
378  def _CreateExplicitObjects(self):
379    # TODO(tengs): Implement object instance parsing
380    pass
381
382  def _CreateImplicitObjects(self):
383    # TODO(tengs): Implement object instance parsing
384    pass
385
386  def _CreateFlowSlices(self):
387    if len(self._all_flow_events) == 0:
388      return
389
390    self._all_flow_events.sort(key=lambda x: x['event']['ts'])
391
392    flow_id_to_event = {}
393    for data in self._all_flow_events:
394      event = data['event']
395      thread = data['thread']
396      if 'name' not in event:
397        self._model.import_errors.append(
398          'Flow events (ph: s, t or f) require a name parameter.')
399        continue
400      if 'id' not in event:
401        self._model.import_errors.append(
402          'Flow events (ph: s, t or f) require an id parameter.')
403        continue
404
405      flow_event = tracing_flow_event.FlowEvent(
406          event['cat'],
407          event['id'],
408          event['name'],
409          event['ts'] / 1000.0,
410          event['args'])
411      thread.AddFlowEvent(flow_event)
412
413      if event['ph'] == 's':
414        if event['id'] in flow_id_to_event:
415          self._model.import_errors.append(
416              'event id %s already seen when encountering start of'
417              'flow event.' % event['id'])
418          continue
419        flow_id_to_event[event['id']] = flow_event
420      elif event['ph'] == 't' or event['ph'] == 'f':
421        if not event['id'] in flow_id_to_event:
422          self._model.import_errors.append(
423            'Found flow phase %s for id: %s but no flow start found.' % (
424                event['ph'], event['id']))
425          continue
426        flow_position = flow_id_to_event[event['id']]
427        self._model.flow_events.append([flow_position, flow_event])
428
429        if event['ph'] == 'f':
430          del flow_id_to_event[event['id']]
431        else:
432          # Make this event the next start event in this flow.
433          flow_id_to_event[event['id']] = flow_event
434
435  def _CreateMemoryDumps(self):
436    self._model.SetGlobalMemoryDumps(
437        memory_dump_event.GlobalMemoryDump(events)
438        for events in self._all_memory_dumps_by_dump_id.itervalues())
439
440  def _SetBrowserProcess(self):
441    for thread in self._model.GetAllThreads():
442      if thread.name == 'CrBrowserMain':
443        self._model.browser_process = thread.parent
444
445  def _SetGpuProcess(self):
446    for thread in self._model.GetAllThreads():
447      if thread.name == 'CrGpuMain':
448        self._model.gpu_process = thread.parent
449