1import os 2import pprint 3import unittest 4import tempfile 5import _hotshot 6import gc 7 8from test import test_support 9 10# Silence Py3k warning 11hotshot = test_support.import_module('hotshot', deprecated=True) 12from hotshot.log import ENTER, EXIT, LINE 13from hotshot import stats 14 15 16def shortfilename(fn): 17 # We use a really shortened filename since an exact match is made, 18 # and the source may be either a Python source file or a 19 # pre-compiled bytecode file. 20 if fn: 21 return os.path.splitext(os.path.basename(fn))[0] 22 else: 23 return fn 24 25 26class UnlinkingLogReader(hotshot.log.LogReader): 27 """Extend the LogReader so the log file is unlinked when we're 28 done with it.""" 29 30 def __init__(self, logfn): 31 self.__logfn = logfn 32 hotshot.log.LogReader.__init__(self, logfn) 33 34 def next(self, index=None): 35 try: 36 return hotshot.log.LogReader.next(self) 37 except StopIteration: 38 self.close() 39 os.unlink(self.__logfn) 40 raise 41 42 43class HotShotTestCase(unittest.TestCase): 44 def new_profiler(self, lineevents=0, linetimings=1): 45 self.logfn = test_support.TESTFN 46 return hotshot.Profile(self.logfn, lineevents, linetimings) 47 48 def get_logreader(self): 49 return UnlinkingLogReader(self.logfn) 50 51 def get_events_wotime(self): 52 L = [] 53 for event in self.get_logreader(): 54 what, (filename, lineno, funcname), tdelta = event 55 L.append((what, (shortfilename(filename), lineno, funcname))) 56 return L 57 58 def check_events(self, expected): 59 events = self.get_events_wotime() 60 if events != expected: 61 self.fail( 62 "events did not match expectation; got:\n%s\nexpected:\n%s" 63 % (pprint.pformat(events), pprint.pformat(expected))) 64 65 def run_test(self, callable, events, profiler=None): 66 if profiler is None: 67 profiler = self.new_profiler() 68 self.assertTrue(not profiler._prof.closed) 69 profiler.runcall(callable) 70 self.assertTrue(not profiler._prof.closed) 71 profiler.close() 72 self.assertTrue(profiler._prof.closed) 73 self.check_events(events) 74 75 def test_addinfo(self): 76 def f(p): 77 p.addinfo("test-key", "test-value") 78 profiler = self.new_profiler() 79 profiler.runcall(f, profiler) 80 profiler.close() 81 log = self.get_logreader() 82 info = log._info 83 list(log) 84 self.assertTrue(info["test-key"] == ["test-value"]) 85 86 def test_line_numbers(self): 87 def f(): 88 y = 2 89 x = 1 90 def g(): 91 f() 92 f_lineno = f.func_code.co_firstlineno 93 g_lineno = g.func_code.co_firstlineno 94 events = [(ENTER, ("test_hotshot", g_lineno, "g")), 95 (LINE, ("test_hotshot", g_lineno+1, "g")), 96 (ENTER, ("test_hotshot", f_lineno, "f")), 97 (LINE, ("test_hotshot", f_lineno+1, "f")), 98 (LINE, ("test_hotshot", f_lineno+2, "f")), 99 (EXIT, ("test_hotshot", f_lineno, "f")), 100 (EXIT, ("test_hotshot", g_lineno, "g")), 101 ] 102 self.run_test(g, events, self.new_profiler(lineevents=1)) 103 104 def test_start_stop(self): 105 # Make sure we don't return NULL in the start() and stop() 106 # methods when there isn't an error. Bug in 2.2 noted by 107 # Anthony Baxter. 108 profiler = self.new_profiler() 109 profiler.start() 110 profiler.stop() 111 profiler.close() 112 os.unlink(self.logfn) 113 114 def test_bad_sys_path(self): 115 import sys 116 import os 117 orig_path = sys.path 118 coverage = hotshot._hotshot.coverage 119 try: 120 # verify we require a list for sys.path 121 sys.path = 'abc' 122 self.assertRaises(RuntimeError, coverage, test_support.TESTFN) 123 # verify that we require sys.path exists 124 del sys.path 125 self.assertRaises(RuntimeError, coverage, test_support.TESTFN) 126 finally: 127 sys.path = orig_path 128 if os.path.exists(test_support.TESTFN): 129 os.remove(test_support.TESTFN) 130 131 def test_logreader_eof_error(self): 132 emptyfile = tempfile.NamedTemporaryFile() 133 try: 134 self.assertRaises((IOError, EOFError), _hotshot.logreader, 135 emptyfile.name) 136 finally: 137 emptyfile.close() 138 gc.collect() 139 140 def test_load_stats(self): 141 def start(prof): 142 prof.start() 143 # Make sure stats can be loaded when start and stop of profiler 144 # are not executed in the same stack frame. 145 profiler = self.new_profiler() 146 start(profiler) 147 profiler.stop() 148 profiler.close() 149 stats.load(self.logfn) 150 os.unlink(self.logfn) 151 152 def test_large_info(self): 153 p = self.new_profiler() 154 self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb) 155 156 157def test_main(): 158 test_support.run_unittest(HotShotTestCase) 159 160 161if __name__ == "__main__": 162 test_main() 163