1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import base64
6import json
7import os
8import tempfile
9import unittest
10
11from py_utils import tempfile_ext
12from tracing.trace_data import trace_data
13
14class TraceDataTest(unittest.TestCase):
15  def testHasTracesForChrome(self):
16    d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}])
17    self.assertTrue(d.HasTracesFor(trace_data.CHROME_TRACE_PART))
18
19  def testHasNotTracesForCpu(self):
20    d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}])
21    self.assertFalse(d.HasTracesFor(trace_data.CPU_TRACE_DATA))
22
23  def testGetTracesForChrome(self):
24    d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}])
25    ts = d.GetTracesFor(trace_data.CHROME_TRACE_PART)
26    self.assertEqual(len(ts), 1)
27    self.assertEqual(ts[0], {'traceEvents': [{'ph': 'B'}]})
28
29  def testGetNoTracesForCpu(self):
30    d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}])
31    ts = d.GetTracesFor(trace_data.CPU_TRACE_DATA)
32    self.assertEqual(ts, [])
33
34
35class TraceDataBuilderTest(unittest.TestCase):
36  def testAddTraceDataAndSerialize(self):
37    with tempfile_ext.TemporaryFileName() as trace_path:
38      with trace_data.TraceDataBuilder() as builder:
39        builder.AddTraceFor(trace_data.CHROME_TRACE_PART,
40                            {'traceEvents': [1, 2, 3]})
41        builder.Serialize(trace_path)
42        self.assertTrue(os.path.exists(trace_path))
43        self.assertGreater(os.stat(trace_path).st_size, 0)  # File not empty.
44
45  def testAddTraceForRaisesWithInvalidPart(self):
46    with trace_data.TraceDataBuilder() as builder:
47      with self.assertRaises(TypeError):
48        builder.AddTraceFor('not_a_trace_part', {})
49
50  def testAddTraceWithUnstructuredData(self):
51    with trace_data.TraceDataBuilder() as builder:
52      builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace',
53                          allow_unstructured=True)
54
55  def testAddTraceRaisesWithImplicitUnstructuredData(self):
56    with trace_data.TraceDataBuilder() as builder:
57      with self.assertRaises(ValueError):
58        builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace')
59
60  def testAddTraceFileFor(self):
61    original_data = {'msg': 'The answer is 42'}
62    with tempfile.NamedTemporaryFile(delete=False, suffix='.json') as source:
63      json.dump(original_data, source)
64    with trace_data.TraceDataBuilder() as builder:
65      builder.AddTraceFileFor(trace_data.CHROME_TRACE_PART, source.name)
66      self.assertFalse(os.path.exists(source.name))
67      out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART)
68
69    self.assertEqual(original_data, out_data)
70
71  def testOpenTraceHandleFor(self):
72    original_data = {'msg': 'The answer is 42'}
73    with trace_data.TraceDataBuilder() as builder:
74      with builder.OpenTraceHandleFor(
75          trace_data.CHROME_TRACE_PART, suffix='.json') as handle:
76        handle.write(json.dumps(original_data))
77      out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART)
78
79    # Trace handle should be cleaned up.
80    self.assertFalse(os.path.exists(handle.name))
81    self.assertEqual(original_data, out_data)
82
83  def testOpenTraceHandleForCompressedData(self):
84    original_data = {'msg': 'The answer is 42'}
85    # gzip.compress() does not work in python 2, so hardcode the encoded data.
86    compressed_data = base64.b64decode(
87        'H4sIAIDMblwAA6tWyi1OV7JSUArJSFVIzCsuTy1SyCxWMDFSquUCAA4QMtscAAAA')
88    with trace_data.TraceDataBuilder() as builder:
89      with builder.OpenTraceHandleFor(
90          trace_data.CHROME_TRACE_PART, suffix='.json.gz') as handle:
91        handle.write(compressed_data)
92      out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART)
93
94    # Trace handle should be cleaned up.
95    self.assertFalse(os.path.exists(handle.name))
96    self.assertEqual(original_data, out_data)
97
98  def testCantWriteAfterCleanup(self):
99    with trace_data.TraceDataBuilder() as builder:
100      builder.AddTraceFor(trace_data.CHROME_TRACE_PART,
101                          {'traceEvents': [1, 2, 3]})
102      builder.CleanUpTraceData()
103      with self.assertRaises(RuntimeError):
104        builder.AddTraceFor(trace_data.CHROME_TRACE_PART,
105                            {'traceEvents': [1, 2, 3]})
106
107  def testCleanupReraisesExceptions(self):
108    with trace_data.TraceDataBuilder() as builder:
109      try:
110        raise Exception("test exception") # pylint: disable=broad-except
111      except Exception: # pylint: disable=broad-except
112        builder.RecordTraceDataException()
113      with self.assertRaises(trace_data.TraceDataException):
114        builder.CleanUpTraceData()
115
116  def testCantWriteAfterFreeze(self):
117    with trace_data.TraceDataBuilder() as builder:
118      builder.AddTraceFor(trace_data.CHROME_TRACE_PART,
119                          {'traceEvents': [1, 2, 3]})
120      builder.Freeze()
121      with self.assertRaises(RuntimeError):
122        builder.AddTraceFor(trace_data.CHROME_TRACE_PART,
123                            {'traceEvents': [1, 2, 3]})
124