1#
2# Copyright (C) 2017 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import sys
18import json
19import time
20import traceback
21import unittest
22from unittest.util import strclass
23from unittest.signals import registerResult
24
25# Tags that Tradefed can understand in SubprocessResultParser to get results
26_CLASSNAME_TAG = 'className'
27_METHOD_NAME_TAG = 'testName'
28_START_TIME_TAG = 'start_time'
29_END_TIME_TAG = 'end_time'
30_TRACE_TAG = 'trace'
31_TEST_COUNT_TAG = 'testCount'
32_REASON_TAG = 'reason'
33_TIME_TAG = 'time'
34
35class TextTestResult(unittest.TextTestResult):
36    """ Class for callbacks based on test state"""
37
38    def _getClassName(self, test):
39        return strclass(test.__class__)
40
41    def _getMethodName(self, test):
42        return test._testMethodName
43
44    def startTestRun(self, count):
45        """ Callback that marks a test run has started.
46
47        Args:
48            count: The number of expected tests.
49        """
50        resp = {_TEST_COUNT_TAG: count, 'runName': 'python-tradefed'}
51        self.stream.write('TEST_RUN_STARTED %s\n' % json.dumps(resp))
52        super(TextTestResult, self).startTestRun()
53
54    def startTest(self, test):
55        """ Callback that marks a test has started to run.
56
57        Args:
58            test: The test that started.
59        """
60        resp = {_START_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
61        self.stream.write('TEST_STARTED %s\n' % json.dumps(resp))
62        super(TextTestResult, self).startTest(test)
63
64    def addSuccess(self, test):
65        """ Callback that marks a test has finished and passed
66
67        Args:
68            test: The test that passed.
69        """
70        resp = {_END_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
71        self.stream.write('TEST_ENDED %s\n' % json.dumps(resp))
72        super(TextTestResult, self).addSuccess(test)
73
74    def addFailure(self, test, err):
75        """ Callback that marks a test has failed
76
77        Args:
78            test: The test that failed.
79            err: the error generated that should be reported.
80        """
81        resp = {_CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test), _TRACE_TAG: '\n'.join(traceback.format_exception(*err))}
82        self.stream.write('TEST_FAILED %s\n' % json.dumps(resp))
83        resp = {_END_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
84        self.stream.write('TEST_ENDED %s\n' % json.dumps(resp))
85        super(TextTestResult, self).addFailure(test, err)
86
87    def addSkip(self, test, reason):
88        """ Callback that marks a test was being skipped
89
90        Args:
91            test: The test being skipped.
92            reason: the message generated that should be reported.
93        """
94        resp = {_CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
95        self.stream.write('TEST_IGNORED %s\n' % json.dumps(resp))
96        resp = {_END_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
97        self.stream.write('TEST_ENDED %s\n' % json.dumps(resp))
98        super(TextTestResult, self).addSkip(test, reason)
99
100    def addExpectedFailure(self, test, err):
101        """ Callback that marks a test was expected to fail and failed.
102
103        Args:
104            test: The test responsible for the error.
105            err: the error generated that should be reported.
106        """
107        resp = {_CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test), _TRACE_TAG: '\n'.join(traceback.format_exception(*err))}
108        self.stream.write('TEST_ASSUMPTION_FAILURE %s\n' % json.dumps(resp))
109        resp = {_END_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
110        self.stream.write('TEST_ENDED %s\n' % json.dumps(resp))
111        super(TextTestResult, self).addExpectedFailure(test, err)
112
113    def addUnexpectedSuccess(self, test):
114        """ Callback that marks a test was expected to fail but passed.
115
116        Args:
117            test: The test responsible for the unexpected success.
118        """
119        resp = {_CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test), _TRACE_TAG: 'Unexpected success'}
120        self.stream.write('TEST_ASSUMPTION_FAILURE %s\n' % json.dumps(resp))
121        resp = {_END_TIME_TAG: time.time(), _CLASSNAME_TAG: self._getClassName(test), _METHOD_NAME_TAG: self._getMethodName(test)}
122        self.stream.write('TEST_ENDED %s\n' % json.dumps(resp))
123        super(TextTestResult, self).addUnexpectedSuccess(test)
124
125    def addError(self, test, err):
126        """ Callback that marks a run as failed because of an error.
127
128        Args:
129            test: The test responsible for the error.
130            err: the error generated that should be reported.
131        """
132        resp = {_REASON_TAG: '\n'.join(traceback.format_exception(*err))}
133        self.stream.write('TEST_RUN_FAILED %s\n' % json.dumps(resp))
134        super(TextTestResult, self).addError(test, err)
135
136    def stopTestRun(self, elapsedTime):
137        """ Callback that marks the end of a test run
138
139        Args:
140            elapsedTime: The elapsed time of the run.
141        """
142        resp = {_TIME_TAG: elapsedTime}
143        self.stream.write('TEST_RUN_ENDED %s\n' % json.dumps(resp))
144        super(TextTestResult, self).stopTestRun()
145
146class TfTextTestRunner(unittest.TextTestRunner):
147    """ Class runner that ensure the callbacks order"""
148
149    def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
150                 failfast=False, buffer=False, resultclass=None, serial=None, extra_options=None):
151        self.serial = serial
152        self.extra_options = extra_options
153        unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity, failfast, buffer, resultclass)
154
155    def _injectDevice(self, testSuites):
156        """ Method to inject options to the base Python Tradefed class
157
158        Args:
159            testSuites: the current test holder.
160        """
161        if self.serial is not None:
162            for testSuite in testSuites:
163                # each test in the test suite
164                for test in testSuite._tests:
165                    try:
166                        test.setUpDevice(self.serial, self.stream, self.extra_options)
167                    except AttributeError:
168                        self.stream.writeln('Test %s does not implement _TradefedTestClass.' % test)
169
170    def run(self, test):
171        """ Run the given test case or test suite. Copied from unittest to replace the startTestRun
172        callback"""
173        result = self._makeResult()
174        result.failfast = self.failfast
175        result.buffer = self.buffer
176        registerResult(result)
177        startTime = time.time()
178        startTestRun = getattr(result, 'startTestRun', None)
179        if startTestRun is not None:
180            startTestRun(test.countTestCases())
181        try:
182            self._injectDevice(test)
183            test(result)
184        finally:
185            stopTestRun = getattr(result, 'stopTestRun', None)
186            if stopTestRun is not None:
187                stopTestRun(time.time() - startTime)
188            else:
189                result.printErrors()
190        stopTime = time.time()
191        timeTaken = stopTime - startTime
192        if hasattr(result, 'separator2'):
193            self.stream.writeln(result.separator2)
194        run = result.testsRun
195        self.stream.writeln('Ran %d test%s in %.3fs' %
196                            (run, run != 1 and 's' or '', timeTaken))
197        self.stream.writeln()
198
199        expectedFails = unexpectedSuccesses = skipped = 0
200        try:
201            results = map(len, (result.expectedFailures,
202                                result.unexpectedSuccesses,
203                                result.skipped))
204            expectedFails, unexpectedSuccesses, skipped = results
205        except AttributeError:
206            pass
207        infos = []
208        if not result.wasSuccessful():
209            self.stream.write('FAILED')
210            failed, errored = map(len, (result.failures, result.errors))
211            if failed:
212                infos.append('failures=%d' % failed)
213            if errored:
214                infos.append('errors=%d' % errored)
215        else:
216            self.stream.write('OK')
217        if skipped:
218            infos.append('skipped=%d' % skipped)
219        if expectedFails:
220            infos.append('expected failures=%d' % expectedFails)
221        if unexpectedSuccesses:
222            infos.append('unexpected successes=%d' % unexpectedSuccesses)
223        if infos:
224            self.stream.writeln(' (%s)' % (', '.join(infos),))
225        else:
226            self.stream.write('\n')
227        return result
228