1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://bitbucket.org/ned/coveragepy/src/default/NOTICE.txt
3
4"""Raw data collector for coverage.py."""
5
6import dis
7import sys
8
9from coverage import env
10
11# We need the YIELD_VALUE opcode below, in a comparison-friendly form.
12YIELD_VALUE = dis.opmap['YIELD_VALUE']
13if env.PY2:
14    YIELD_VALUE = chr(YIELD_VALUE)
15
16
17class PyTracer(object):
18    """Python implementation of the raw data tracer."""
19
20    # Because of poor implementations of trace-function-manipulating tools,
21    # the Python trace function must be kept very simple.  In particular, there
22    # must be only one function ever set as the trace function, both through
23    # sys.settrace, and as the return value from the trace function.  Put
24    # another way, the trace function must always return itself.  It cannot
25    # swap in other functions, or return None to avoid tracing a particular
26    # frame.
27    #
28    # The trace manipulator that introduced this restriction is DecoratorTools,
29    # which sets a trace function, and then later restores the pre-existing one
30    # by calling sys.settrace with a function it found in the current frame.
31    #
32    # Systems that use DecoratorTools (or similar trace manipulations) must use
33    # PyTracer to get accurate results.  The command-line --timid argument is
34    # used to force the use of this tracer.
35
36    def __init__(self):
37        # Attributes set from the collector:
38        self.data = None
39        self.trace_arcs = False
40        self.should_trace = None
41        self.should_trace_cache = None
42        self.warn = None
43        # The threading module to use, if any.
44        self.threading = None
45
46        self.cur_file_dict = []
47        self.last_line = [0]
48
49        self.data_stack = []
50        self.last_exc_back = None
51        self.last_exc_firstlineno = 0
52        self.thread = None
53        self.stopped = False
54
55    def __repr__(self):
56        return "<PyTracer at 0x{0:0x}: {1} lines in {2} files>".format(
57            id(self),
58            sum(len(v) for v in self.data.values()),
59            len(self.data),
60        )
61
62    def _trace(self, frame, event, arg_unused):
63        """The trace function passed to sys.settrace."""
64
65        if self.stopped:
66            return
67
68        if self.last_exc_back:
69            if frame == self.last_exc_back:
70                # Someone forgot a return event.
71                if self.trace_arcs and self.cur_file_dict:
72                    pair = (self.last_line, -self.last_exc_firstlineno)
73                    self.cur_file_dict[pair] = None
74                self.cur_file_dict, self.last_line = self.data_stack.pop()
75            self.last_exc_back = None
76
77        if event == 'call':
78            # Entering a new function context.  Decide if we should trace
79            # in this file.
80            self.data_stack.append((self.cur_file_dict, self.last_line))
81            filename = frame.f_code.co_filename
82            disp = self.should_trace_cache.get(filename)
83            if disp is None:
84                disp = self.should_trace(filename, frame)
85                self.should_trace_cache[filename] = disp
86
87            self.cur_file_dict = None
88            if disp.trace:
89                tracename = disp.source_filename
90                if tracename not in self.data:
91                    self.data[tracename] = {}
92                self.cur_file_dict = self.data[tracename]
93            # The call event is really a "start frame" event, and happens for
94            # function calls and re-entering generators.  The f_lasti field is
95            # -1 for calls, and a real offset for generators.  Use -1 as the
96            # line number for calls, and the real line number for generators.
97            self.last_line = -1 if (frame.f_lasti < 0) else frame.f_lineno
98        elif event == 'line':
99            # Record an executed line.
100            if self.cur_file_dict is not None:
101                lineno = frame.f_lineno
102                if self.trace_arcs:
103                    self.cur_file_dict[(self.last_line, lineno)] = None
104                else:
105                    self.cur_file_dict[lineno] = None
106                self.last_line = lineno
107        elif event == 'return':
108            if self.trace_arcs and self.cur_file_dict:
109                # Record an arc leaving the function, but beware that a
110                # "return" event might just mean yielding from a generator.
111                bytecode = frame.f_code.co_code[frame.f_lasti]
112                if bytecode != YIELD_VALUE:
113                    first = frame.f_code.co_firstlineno
114                    self.cur_file_dict[(self.last_line, -first)] = None
115            # Leaving this function, pop the filename stack.
116            self.cur_file_dict, self.last_line = self.data_stack.pop()
117        elif event == 'exception':
118            self.last_exc_back = frame.f_back
119            self.last_exc_firstlineno = frame.f_code.co_firstlineno
120        return self._trace
121
122    def start(self):
123        """Start this Tracer.
124
125        Return a Python function suitable for use with sys.settrace().
126
127        """
128        if self.threading:
129            self.thread = self.threading.currentThread()
130        sys.settrace(self._trace)
131        self.stopped = False
132        return self._trace
133
134    def stop(self):
135        """Stop this Tracer."""
136        self.stopped = True
137        if self.threading and self.thread != self.threading.currentThread():
138            # Called on a different thread than started us: we can't unhook
139            # ourselves, but we've set the flag that we should stop, so we
140            # won't do any more tracing.
141            return
142
143        if self.warn:
144            if sys.gettrace() != self._trace:
145                msg = "Trace function changed, measurement is likely wrong: %r"
146                self.warn(msg % (sys.gettrace(),))
147
148        sys.settrace(None)
149
150    def get_stats(self):
151        """Return a dictionary of statistics, or None."""
152        return None
153