1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Timeline visualization for TensorFlow using Chrome Trace Format."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import collections
22import copy
23import json
24import re
25
26# The timeline target is usually imported as part of BUILD target
27# "platform_test", which includes also includes the "platform"
28# dependency.  This is why the logging import here is okay.
29from tensorflow.python.platform import tf_logging as logging
30
31
32class AllocationMaximum(collections.namedtuple(
33    'AllocationMaximum', ('timestamp', 'num_bytes', 'tensors'))):
34  """Stores the maximum allocation for a given allocator within the timelne.
35
36  Parameters:
37    timestamp: `tensorflow::Env::NowMicros()` when this maximum was reached.
38    num_bytes: the total memory used at this time.
39    tensors: the set of tensors allocated at this time.
40  """
41  pass
42
43
44class StepStatsAnalysis(collections.namedtuple(
45    'StepStatsAnalysis', ('chrome_trace', 'allocator_maximums'))):
46  """Stores the step stats analysis output.
47
48  Parameters:
49    chrome_trace: A dict containing the chrome trace analysis.
50    allocator_maximums: A dict mapping allocator names to AllocationMaximum.
51  """
52  pass
53
54
55class _ChromeTraceFormatter(object):
56  """A helper class for generating traces in Chrome Trace Format."""
57
58  def __init__(self, show_memory=False):
59    """Constructs a new Chrome Trace formatter."""
60    self._show_memory = show_memory
61    self._events = []
62    self._metadata = []
63
64  def _create_event(self, ph, category, name, pid, tid, timestamp):
65    """Creates a new Chrome Trace event.
66
67    For details of the file format, see:
68    https://github.com/catapult-project/catapult/blob/master/tracing/README.md
69
70    Args:
71      ph:  The type of event - usually a single character.
72      category: The event category as a string.
73      name:  The event name as a string.
74      pid:  Identifier of the process generating this event as an integer.
75      tid:  Identifier of the thread generating this event as an integer.
76      timestamp:  The timestamp of this event as a long integer.
77
78    Returns:
79      A JSON compatible event object.
80    """
81    event = {}
82    event['ph'] = ph
83    event['cat'] = category
84    event['name'] = name
85    event['pid'] = pid
86    event['tid'] = tid
87    event['ts'] = timestamp
88    return event
89
90  def emit_pid(self, name, pid):
91    """Adds a process metadata event to the trace.
92
93    Args:
94      name:  The process name as a string.
95      pid:  Identifier of the process as an integer.
96    """
97    event = {}
98    event['name'] = 'process_name'
99    event['ph'] = 'M'
100    event['pid'] = pid
101    event['args'] = {'name': name}
102    self._metadata.append(event)
103
104  def emit_tid(self, name, pid, tid):
105    """Adds a thread metadata event to the trace.
106
107    Args:
108      name:  The thread name as a string.
109      pid:  Identifier of the process as an integer.
110      tid:  Identifier of the thread as an integer.
111    """
112    event = {}
113    event['name'] = 'thread_name'
114    event['ph'] = 'M'
115    event['pid'] = pid
116    event['tid'] = tid
117    event['args'] = {'name': name}
118    self._metadata.append(event)
119
120  def emit_region(self, timestamp, duration, pid, tid, category, name, args):
121    """Adds a region event to the trace.
122
123    Args:
124      timestamp:  The start timestamp of this region as a long integer.
125      duration:  The duration of this region as a long integer.
126      pid:  Identifier of the process generating this event as an integer.
127      tid:  Identifier of the thread generating this event as an integer.
128      category: The event category as a string.
129      name:  The event name as a string.
130      args:  A JSON-compatible dictionary of event arguments.
131    """
132    event = self._create_event('X', category, name, pid, tid, timestamp)
133    event['dur'] = duration
134    event['args'] = args
135    self._events.append(event)
136
137  def emit_obj_create(self, category, name, timestamp, pid, tid, object_id):
138    """Adds an object creation event to the trace.
139
140    Args:
141      category: The event category as a string.
142      name:  The event name as a string.
143      timestamp:  The timestamp of this event as a long integer.
144      pid:  Identifier of the process generating this event as an integer.
145      tid:  Identifier of the thread generating this event as an integer.
146      object_id: Identifier of the object as an integer.
147    """
148    event = self._create_event('N', category, name, pid, tid, timestamp)
149    event['id'] = object_id
150    self._events.append(event)
151
152  def emit_obj_delete(self, category, name, timestamp, pid, tid, object_id):
153    """Adds an object deletion event to the trace.
154
155    Args:
156      category: The event category as a string.
157      name:  The event name as a string.
158      timestamp:  The timestamp of this event as a long integer.
159      pid:  Identifier of the process generating this event as an integer.
160      tid:  Identifier of the thread generating this event as an integer.
161      object_id: Identifier of the object as an integer.
162    """
163    event = self._create_event('D', category, name, pid, tid, timestamp)
164    event['id'] = object_id
165    self._events.append(event)
166
167  def emit_obj_snapshot(self, category, name, timestamp, pid, tid, object_id,
168                        snapshot):
169    """Adds an object snapshot event to the trace.
170
171    Args:
172      category: The event category as a string.
173      name:  The event name as a string.
174      timestamp:  The timestamp of this event as a long integer.
175      pid:  Identifier of the process generating this event as an integer.
176      tid:  Identifier of the thread generating this event as an integer.
177      object_id: Identifier of the object as an integer.
178      snapshot:  A JSON-compatible representation of the object.
179    """
180    event = self._create_event('O', category, name, pid, tid, timestamp)
181    event['id'] = object_id
182    event['args'] = {'snapshot': snapshot}
183    self._events.append(event)
184
185  def emit_flow_start(self, name, timestamp, pid, tid, flow_id):
186    """Adds a flow start event to the trace.
187
188    When matched with a flow end event (with the same 'flow_id') this will
189    cause the trace viewer to draw an arrow between the start and end events.
190
191    Args:
192      name:  The event name as a string.
193      timestamp:  The timestamp of this event as a long integer.
194      pid:  Identifier of the process generating this event as an integer.
195      tid:  Identifier of the thread generating this event as an integer.
196      flow_id: Identifier of the flow as an integer.
197    """
198    event = self._create_event('s', 'DataFlow', name, pid, tid, timestamp)
199    event['id'] = flow_id
200    self._events.append(event)
201
202  def emit_flow_end(self, name, timestamp, pid, tid, flow_id):
203    """Adds a flow end event to the trace.
204
205    When matched with a flow start event (with the same 'flow_id') this will
206    cause the trace viewer to draw an arrow between the start and end events.
207
208    Args:
209      name:  The event name as a string.
210      timestamp:  The timestamp of this event as a long integer.
211      pid:  Identifier of the process generating this event as an integer.
212      tid:  Identifier of the thread generating this event as an integer.
213      flow_id: Identifier of the flow as an integer.
214    """
215    event = self._create_event('t', 'DataFlow', name, pid, tid, timestamp)
216    event['id'] = flow_id
217    self._events.append(event)
218
219  def emit_counter(self, category, name, pid, timestamp, counter, value):
220    """Emits a record for a single counter.
221
222    Args:
223      category: The event category as a string.
224      name:  The event name as a string.
225      pid:  Identifier of the process generating this event as an integer.
226      timestamp:  The timestamp of this event as a long integer.
227      counter: Name of the counter as a string.
228      value:  Value of the counter as an integer.
229    """
230    event = self._create_event('C', category, name, pid, 0, timestamp)
231    event['args'] = {counter: value}
232    self._events.append(event)
233
234  def emit_counters(self, category, name, pid, timestamp, counters):
235    """Emits a counter record for the dictionary 'counters'.
236
237    Args:
238      category: The event category as a string.
239      name:  The event name as a string.
240      pid:  Identifier of the process generating this event as an integer.
241      timestamp:  The timestamp of this event as a long integer.
242      counters: Dictionary of counter values.
243    """
244    event = self._create_event('C', category, name, pid, 0, timestamp)
245    event['args'] = counters.copy()
246    self._events.append(event)
247
248  def format_to_string(self, pretty=False):
249    """Formats the chrome trace to a string.
250
251    Args:
252      pretty: (Optional.)  If True, produce human-readable JSON output.
253
254    Returns:
255      A JSON-formatted string in Chrome Trace format.
256    """
257    trace = {}
258    trace['traceEvents'] = self._metadata + self._events
259    if pretty:
260      return json.dumps(trace, indent=4, separators=(',', ': '))
261    else:
262      return json.dumps(trace, separators=(',', ':'))
263
264
265class _TensorTracker(object):
266  """An internal class to track the lifetime of a Tensor."""
267
268  def __init__(self, name, object_id, timestamp, pid, allocator, num_bytes):
269    """Creates an object to track tensor references.
270
271    This class is not thread safe and is intended only for internal use by
272    the 'Timeline' class in this file.
273
274    Args:
275      name:  The name of the Tensor as a string.
276      object_id:  Chrome Trace object identifier assigned for this Tensor.
277      timestamp:  The creation timestamp of this event as a long integer.
278      pid:  Process identifier of the associated device, as an integer.
279      allocator:  Name of the allocator used to create the Tensor.
280      num_bytes:  Number of bytes allocated (long integer).
281
282    Returns:
283      A 'TensorTracker' object.
284    """
285    self._name = name
286    self._pid = pid
287    self._object_id = object_id
288    self._create_time = timestamp
289    self._allocator = allocator
290    self._num_bytes = num_bytes
291    self._ref_times = []
292    self._unref_times = []
293
294  @property
295  def name(self):
296    """Name of this tensor."""
297    return self._name
298
299  @property
300  def pid(self):
301    """ID of the process which created this tensor (an integer)."""
302    return self._pid
303
304  @property
305  def create_time(self):
306    """Timestamp when this tensor was created (long integer)."""
307    return self._create_time
308
309  @property
310  def object_id(self):
311    """Returns the object identifier of this tensor (integer)."""
312    return self._object_id
313
314  @property
315  def num_bytes(self):
316    """Size of this tensor in bytes (long integer)."""
317    return self._num_bytes
318
319  @property
320  def allocator(self):
321    """Name of the allocator used to create this tensor (string)."""
322    return self._allocator
323
324  @property
325  def last_unref(self):
326    """Last unreference timestamp of this tensor (long integer)."""
327    return max(self._unref_times)
328
329  def add_ref(self, timestamp):
330    """Adds a reference to this tensor with the specified timestamp.
331
332    Args:
333      timestamp:  Timestamp of object reference as an integer.
334    """
335    self._ref_times.append(timestamp)
336
337  def add_unref(self, timestamp):
338    """Adds an unref to this tensor with the specified timestamp.
339
340    Args:
341      timestamp:  Timestamp of object unreference as an integer.
342    """
343    self._unref_times.append(timestamp)
344
345
346class Timeline(object):
347  """A class for visualizing execution timelines of TensorFlow steps."""
348
349  def __init__(self, step_stats, graph=None):
350    """Constructs a new Timeline.
351
352    A 'Timeline' is used for visualizing the execution of a TensorFlow
353    computation.  It shows the timings and concurrency of execution at
354    the granularity of TensorFlow Ops.
355    This class is not thread safe.
356
357    Args:
358      step_stats: The 'StepStats' proto recording execution times.
359      graph: (Optional) The 'Graph' that was executed.
360    """
361
362    self._step_stats = step_stats
363    self._graph = graph
364    self._chrome_trace = _ChromeTraceFormatter()
365    self._next_pid = 0
366    self._device_pids = {}  # device name -> pid for compute activity.
367    self._tensor_pids = {}  # device name -> pid for tensors.
368    self._tensors = {}  # tensor_name -> TensorTracker
369    self._next_flow_id = 0
370    self._flow_starts = {}  # tensor_name -> (timestamp, pid, tid)
371    self._alloc_times = {}  # tensor_name -> ( time, allocator, size )
372    self._allocator_maximums = {}  # allocator name => maximum bytes long
373
374  def _alloc_pid(self):
375    """Allocate a process Id."""
376    pid = self._next_pid
377    self._next_pid += 1
378    return pid
379
380  def _alloc_flow_id(self):
381    """Allocate a flow Id."""
382    flow_id = self._next_flow_id
383    self._next_flow_id += 1
384    return flow_id
385
386  def _parse_op_label(self, label):
387    """Parses the fields in a node timeline label."""
388    # Expects labels of the form: name = op(arg, arg, ...).
389    match = re.match(r'(.*) = (.*)\((.*)\)', label)
390    if match is None:
391      return 'unknown', 'unknown', []
392    nn, op, inputs = match.groups()
393    if not inputs:
394      inputs = []
395    else:
396      inputs = inputs.split(', ')
397    return nn, op, inputs
398
399  def _assign_lanes(self):
400    """Assigns non-overlapping lanes for the activities on each device."""
401    for device_stats in self._step_stats.dev_stats:
402      # TODO(pbar): Genuine thread IDs in NodeExecStats might be helpful.
403      lanes = [0]
404      for ns in device_stats.node_stats:
405        l = -1
406        for (i, lts) in enumerate(lanes):
407          if ns.all_start_micros > lts:
408            l = i
409            lanes[l] = ns.all_start_micros + ns.all_end_rel_micros
410            break
411        if l < 0:
412          l = len(lanes)
413          lanes.append(ns.all_start_micros + ns.all_end_rel_micros)
414        ns.thread_id = l
415
416  def _emit_op(self, nodestats, pid, is_gputrace):
417    """Generates a Chrome Trace event to show Op execution.
418
419    Args:
420      nodestats: The 'NodeExecStats' proto recording op execution.
421      pid: The pid assigned for the device where this op ran.
422      is_gputrace: If True then this op came from the GPUTracer.
423    """
424    node_name = nodestats.node_name
425    start = nodestats.all_start_micros
426    duration = nodestats.all_end_rel_micros
427    tid = nodestats.thread_id
428    inputs = []
429    if is_gputrace:
430      # Node names should always have the form 'name:op'.
431      fields = node_name.split(':') + ['unknown']
432      node_name, op = fields[:2]
433    elif node_name == 'RecvTensor':
434      # RPC tracing does not use the standard timeline_label format.
435      op = 'RecvTensor'
436    else:
437      _, op, inputs = self._parse_op_label(nodestats.timeline_label)
438    args = {'name': node_name, 'op': op}
439    for i, iname in enumerate(inputs):
440      args['input%d' % i] = iname
441    self._chrome_trace.emit_region(start, duration, pid, tid, 'Op', op, args)
442
443  def _emit_tensor_snapshot(self, tensor, timestamp, pid, tid, value):
444    """Generate Chrome Trace snapshot event for a computed Tensor.
445
446    Args:
447      tensor: A 'TensorTracker' object.
448      timestamp:  The timestamp of this snapshot as a long integer.
449      pid: The pid assigned for showing the device where this op ran.
450      tid: The tid of the thread computing the tensor snapshot.
451      value: A JSON-compliant snapshot of the object.
452    """
453    desc = str(value.tensor_description).replace('"', '')
454    snapshot = {'tensor_description': desc}
455    self._chrome_trace.emit_obj_snapshot('Tensor', tensor.name, timestamp, pid,
456                                         tid, tensor.object_id, snapshot)
457
458  def _produce_tensor(self, name, timestamp, tensors_pid, allocator, num_bytes):
459    object_id = len(self._tensors)
460    tensor = _TensorTracker(name, object_id, timestamp, tensors_pid, allocator,
461                            num_bytes)
462    self._tensors[name] = tensor
463    return tensor
464
465  def _is_gputrace_device(self, device_name):
466    """Returns true if this device is part of the GPUTracer logging."""
467    return '/stream:' in device_name or '/memcpy' in device_name
468
469  def _allocate_pids(self):
470    """Allocate fake process ids for each device in the StepStats."""
471    self._allocators_pid = self._alloc_pid()
472    self._chrome_trace.emit_pid('Allocators', self._allocators_pid)
473
474    # Add processes in the Chrome trace to show compute and data activity.
475    for dev_stats in self._step_stats.dev_stats:
476      device_pid = self._alloc_pid()
477      self._device_pids[dev_stats.device] = device_pid
478      tensors_pid = self._alloc_pid()
479      self._tensor_pids[dev_stats.device] = tensors_pid
480      self._chrome_trace.emit_pid(dev_stats.device + ' Compute', device_pid)
481      self._chrome_trace.emit_pid(dev_stats.device + ' Tensors', tensors_pid)
482
483  def _analyze_tensors(self, show_memory):
484    """Analyze tensor references to track dataflow."""
485    for dev_stats in self._step_stats.dev_stats:
486      device_pid = self._device_pids[dev_stats.device]
487      tensors_pid = self._tensor_pids[dev_stats.device]
488      for node_stats in dev_stats.node_stats:
489        tid = node_stats.thread_id
490        node_name = node_stats.node_name
491        start_time = node_stats.all_start_micros
492        end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros
493        for index, output in enumerate(node_stats.output):
494          if index:
495            output_name = '%s:%d' % (node_name, index)
496          else:
497            output_name = node_name
498
499          allocation = output.tensor_description.allocation_description
500          num_bytes = allocation.requested_bytes
501          allocator_name = allocation.allocator_name
502          tensor = self._produce_tensor(output_name, start_time, tensors_pid,
503                                        allocator_name, num_bytes)
504          tensor.add_ref(start_time)
505          tensor.add_unref(end_time)
506          self._flow_starts[output_name] = (end_time, device_pid, tid)
507
508          if show_memory:
509            self._chrome_trace.emit_obj_create('Tensor', output_name,
510                                               start_time, tensors_pid, tid,
511                                               tensor.object_id)
512            self._emit_tensor_snapshot(tensor, end_time - 1, tensors_pid, tid,
513                                       output)
514
515  def _show_compute(self, show_dataflow):
516    """Visualize the computation activity."""
517    for dev_stats in self._step_stats.dev_stats:
518      device_name = dev_stats.device
519      device_pid = self._device_pids[device_name]
520      is_gputrace = self._is_gputrace_device(device_name)
521
522      for node_stats in dev_stats.node_stats:
523        tid = node_stats.thread_id
524        start_time = node_stats.all_start_micros
525        end_time = node_stats.all_start_micros + node_stats.all_end_rel_micros
526        self._emit_op(node_stats, device_pid, is_gputrace)
527
528        if is_gputrace or node_stats.node_name == 'RecvTensor':
529          continue
530
531        _, _, inputs = self._parse_op_label(node_stats.timeline_label)
532        for input_name in inputs:
533          if input_name not in self._tensors:
534            # This can happen when partitioning has inserted a Send/Recv.
535            # We remove the numeric suffix so that the dataflow appears to
536            # come from the original node.  Ideally, the StepStats would
537            # contain logging for the Send and Recv nodes.
538            index = input_name.rfind('/_')
539            if index > 0:
540              input_name = input_name[:index]
541
542          if input_name in self._tensors:
543            tensor = self._tensors[input_name]
544            tensor.add_ref(start_time)
545            tensor.add_unref(end_time - 1)
546
547            if show_dataflow:
548              # We use a different flow ID for every graph edge.
549              create_time, create_pid, create_tid = self._flow_starts[
550                  input_name]
551              # Don't add flows when producer and consumer ops are on the same
552              # pid/tid since the horizontal arrows clutter the visualization.
553              if create_pid != device_pid or create_tid != tid:
554                flow_id = self._alloc_flow_id()
555                self._chrome_trace.emit_flow_start(input_name, create_time,
556                                                   create_pid, create_tid,
557                                                   flow_id)
558                self._chrome_trace.emit_flow_end(input_name, start_time,
559                                                 device_pid, tid, flow_id)
560          else:
561            logging.vlog(1, 'Can\'t find tensor %s - removed by CSE?',
562                         input_name)
563
564  def _show_memory_counters(self):
565    """Produce a counter series for each memory allocator."""
566    # Iterate over all tensor trackers to build a list of allocations and
567    # frees for each allocator. Then sort the lists and emit a cumulative
568    # counter series for each allocator.
569    allocations = {}
570    for name in self._tensors:
571      tensor = self._tensors[name]
572      self._chrome_trace.emit_obj_delete('Tensor', name, tensor.last_unref,
573                                         tensor.pid, 0, tensor.object_id)
574      allocator = tensor.allocator
575      if allocator not in allocations:
576        allocations[allocator] = []
577      num_bytes = tensor.num_bytes
578      allocations[allocator].append((tensor.create_time, num_bytes, name))
579      allocations[allocator].append((tensor.last_unref, -num_bytes, name))
580
581    alloc_maxes = {}
582
583    # Generate a counter series showing total allocations for each allocator.
584    for allocator in allocations:
585      alloc_list = allocations[allocator]
586      alloc_list.sort()
587      total_bytes = 0
588      alloc_tensor_set = set()
589      alloc_maxes[allocator] = AllocationMaximum(
590          timestamp=0, num_bytes=0, tensors=set())
591      for time, num_bytes, name in sorted(
592          alloc_list, key=lambda allocation: allocation[0]):
593        total_bytes += num_bytes
594        if num_bytes < 0:
595          alloc_tensor_set.discard(name)
596        else:
597          alloc_tensor_set.add(name)
598
599        if total_bytes > alloc_maxes[allocator].num_bytes:
600          alloc_maxes[allocator] = AllocationMaximum(
601              timestamp=time,
602              num_bytes=total_bytes,
603              tensors=copy.deepcopy(alloc_tensor_set))
604
605        self._chrome_trace.emit_counter('Memory', allocator,
606                                        self._allocators_pid, time, allocator,
607                                        total_bytes)
608    self._allocator_maximums = alloc_maxes
609
610  def analyze_step_stats(self, show_dataflow=True, show_memory=True):
611    self._allocate_pids()
612    self._assign_lanes()
613    self._analyze_tensors(show_memory)
614    self._show_compute(show_dataflow)
615    if show_memory:
616      self._show_memory_counters()
617    return StepStatsAnalysis(
618        chrome_trace=self._chrome_trace,
619        allocator_maximums=self._allocator_maximums)
620
621  def generate_chrome_trace_format(self, show_dataflow=True, show_memory=False):
622    """Produces a trace in Chrome Trace Format.
623
624    Args:
625      show_dataflow: (Optional.) If True, add flow events to the trace
626        connecting producers and consumers of tensors.
627      show_memory: (Optional.) If True, add object snapshot events to the trace
628        showing the sizes and lifetimes of tensors.
629
630    Returns:
631      A JSON formatted string in Chrome Trace format.
632    """
633    step_stats_analysis = self.analyze_step_stats(
634        show_dataflow=show_dataflow, show_memory=show_memory)
635
636    return step_stats_analysis.chrome_trace.format_to_string(pretty=True)
637