1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import base64 6import json 7import os 8import tempfile 9import unittest 10 11from py_utils import tempfile_ext 12from tracing.trace_data import trace_data 13 14class TraceDataTest(unittest.TestCase): 15 def testHasTracesForChrome(self): 16 d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) 17 self.assertTrue(d.HasTracesFor(trace_data.CHROME_TRACE_PART)) 18 19 def testHasNotTracesForCpu(self): 20 d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) 21 self.assertFalse(d.HasTracesFor(trace_data.CPU_TRACE_DATA)) 22 23 def testGetTracesForChrome(self): 24 d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) 25 ts = d.GetTracesFor(trace_data.CHROME_TRACE_PART) 26 self.assertEqual(len(ts), 1) 27 self.assertEqual(ts[0], {'traceEvents': [{'ph': 'B'}]}) 28 29 def testGetNoTracesForCpu(self): 30 d = trace_data.CreateFromRawChromeEvents([{'ph': 'B'}]) 31 ts = d.GetTracesFor(trace_data.CPU_TRACE_DATA) 32 self.assertEqual(ts, []) 33 34 35class TraceDataBuilderTest(unittest.TestCase): 36 def testAddTraceDataAndSerialize(self): 37 with tempfile_ext.TemporaryFileName() as trace_path: 38 with trace_data.TraceDataBuilder() as builder: 39 builder.AddTraceFor(trace_data.CHROME_TRACE_PART, 40 {'traceEvents': [1, 2, 3]}) 41 builder.Serialize(trace_path) 42 self.assertTrue(os.path.exists(trace_path)) 43 self.assertGreater(os.stat(trace_path).st_size, 0) # File not empty. 44 45 def testAddTraceForRaisesWithInvalidPart(self): 46 with trace_data.TraceDataBuilder() as builder: 47 with self.assertRaises(TypeError): 48 builder.AddTraceFor('not_a_trace_part', {}) 49 50 def testAddTraceWithUnstructuredData(self): 51 with trace_data.TraceDataBuilder() as builder: 52 builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace', 53 allow_unstructured=True) 54 55 def testAddTraceRaisesWithImplicitUnstructuredData(self): 56 with trace_data.TraceDataBuilder() as builder: 57 with self.assertRaises(ValueError): 58 builder.AddTraceFor(trace_data.TELEMETRY_PART, 'unstructured trace') 59 60 def testAddTraceFileFor(self): 61 original_data = {'msg': 'The answer is 42'} 62 with tempfile.NamedTemporaryFile(delete=False, suffix='.json') as source: 63 json.dump(original_data, source) 64 with trace_data.TraceDataBuilder() as builder: 65 builder.AddTraceFileFor(trace_data.CHROME_TRACE_PART, source.name) 66 self.assertFalse(os.path.exists(source.name)) 67 out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) 68 69 self.assertEqual(original_data, out_data) 70 71 def testOpenTraceHandleFor(self): 72 original_data = {'msg': 'The answer is 42'} 73 with trace_data.TraceDataBuilder() as builder: 74 with builder.OpenTraceHandleFor( 75 trace_data.CHROME_TRACE_PART, suffix='.json') as handle: 76 handle.write(json.dumps(original_data)) 77 out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) 78 79 # Trace handle should be cleaned up. 80 self.assertFalse(os.path.exists(handle.name)) 81 self.assertEqual(original_data, out_data) 82 83 def testOpenTraceHandleForCompressedData(self): 84 original_data = {'msg': 'The answer is 42'} 85 # gzip.compress() does not work in python 2, so hardcode the encoded data. 86 compressed_data = base64.b64decode( 87 'H4sIAIDMblwAA6tWyi1OV7JSUArJSFVIzCsuTy1SyCxWMDFSquUCAA4QMtscAAAA') 88 with trace_data.TraceDataBuilder() as builder: 89 with builder.OpenTraceHandleFor( 90 trace_data.CHROME_TRACE_PART, suffix='.json.gz') as handle: 91 handle.write(compressed_data) 92 out_data = builder.AsData().GetTraceFor(trace_data.CHROME_TRACE_PART) 93 94 # Trace handle should be cleaned up. 95 self.assertFalse(os.path.exists(handle.name)) 96 self.assertEqual(original_data, out_data) 97 98 def testCantWriteAfterCleanup(self): 99 with trace_data.TraceDataBuilder() as builder: 100 builder.AddTraceFor(trace_data.CHROME_TRACE_PART, 101 {'traceEvents': [1, 2, 3]}) 102 builder.CleanUpTraceData() 103 with self.assertRaises(RuntimeError): 104 builder.AddTraceFor(trace_data.CHROME_TRACE_PART, 105 {'traceEvents': [1, 2, 3]}) 106 107 def testCleanupReraisesExceptions(self): 108 with trace_data.TraceDataBuilder() as builder: 109 try: 110 raise Exception("test exception") # pylint: disable=broad-except 111 except Exception: # pylint: disable=broad-except 112 builder.RecordTraceDataException() 113 with self.assertRaises(trace_data.TraceDataException): 114 builder.CleanUpTraceData() 115 116 def testCantWriteAfterFreeze(self): 117 with trace_data.TraceDataBuilder() as builder: 118 builder.AddTraceFor(trace_data.CHROME_TRACE_PART, 119 {'traceEvents': [1, 2, 3]}) 120 builder.Freeze() 121 with self.assertRaises(RuntimeError): 122 builder.AddTraceFor(trace_data.CHROME_TRACE_PART, 123 {'traceEvents': [1, 2, 3]}) 124