1#
2# Copyright (C) 2016 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import os
18import logging
19import itertools
20
21from vts.runners.host import const
22from vts.testcases.kernel.ltp import ltp_configs
23from vts.testcases.kernel.ltp import ltp_enums
24from vts.testcases.kernel.ltp import test_case
25from vts.testcases.kernel.ltp.configs import stable_tests
26from vts.testcases.kernel.ltp.configs import disabled_tests
27from vts.utils.python.common import filter_utils
28
29
30class TestCasesParser(object):
31    """Load a ltp vts testcase definition file and parse it into a generator.
32
33    Attributes:
34        _data_path: string, the vts data path on host side
35        _filter_func: function, a filter method that will emit exception if a test is filtered
36        _ltp_tests_filter: list of string, filter for tests that are stable and disabled
37    """
38
39    def __init__(self, data_path, filter_func):
40        self._data_path = data_path
41        self._filter_func = filter_func
42        self._ltp_tests_filter = filter_utils.Filter(
43            [ i[0] for i in stable_tests.STABLE_TESTS ],
44            disabled_tests.DISABLED_TESTS,
45            enable_regex=True)
46        self._ltp_tests_filter.ExpandBitness()
47
48    def ValidateDefinition(self, line):
49        """Validate a tab delimited test case definition.
50
51        Will check whether the given line of definition has three parts
52        separated by tabs.
53        It will also trim leading and ending white spaces for each part
54        in returned tuple (if valid).
55
56        Returns:
57            A tuple in format (test suite, test name, test command) if
58            definition is valid. None otherwise.
59        """
60        items = [
61            item.strip()
62            for item in line.split(ltp_enums.Delimiters.TESTCASE_DEFINITION)
63        ]
64        if not len(items) == 3 or not items:
65            return None
66        else:
67            return items
68
69    def Load(self,
70             ltp_dir,
71             n_bit,
72             test_filter,
73             run_staging=False,
74             is_low_mem=False,
75             is_hwasan=False):
76        """Read the definition file and yields a TestCase generator.
77
78        Args:
79            ltp_dir: string, directory that contains ltp binaries and scripts
80            n_bit: int, bitness
81            test_filter: Filter object, test name filter from base_test
82            run_staging: bool, whether to use staging configuration
83            is_low_mem: bool, whether to use low memory device configuration
84        """
85        scenario_groups = (ltp_configs.TEST_SUITES_LOW_MEM
86                           if is_low_mem else ltp_configs.TEST_SUITES)
87        logging.info('LTP scenario groups: %s', scenario_groups)
88
89        run_scritp = self.GenerateLtpRunScript(scenario_groups, is_hwasan=is_hwasan)
90
91        for line in run_scritp:
92            items = self.ValidateDefinition(line)
93            if not items:
94                continue
95
96            testsuite, testname, command = items
97            if is_low_mem and testsuite.endswith(
98                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX):
99                testsuite = testsuite[:-len(
100                    ltp_configs.LOW_MEMORY_SCENARIO_GROUP_SUFFIX)]
101
102            # Tests failed to build will have prefix "DISABLED_"
103            if testname.startswith("DISABLED_"):
104                logging.info("[Parser] Skipping test case {}-{}. Reason: "
105                             "not built".format(testsuite, testname))
106                continue
107
108            # Some test cases contain semicolons in their commands,
109            # and we replace them with &&
110            command = command.replace(';', '&&')
111
112            # Some test cases have hardcoded "/tmp" in the command
113            # we replace that with ltp_configs.TMPDIR
114            command = command.replace('/tmp', ltp_configs.TMPDIR)
115
116            testcase = test_case.TestCase(
117                testsuite=testsuite, testname=testname, command=command)
118
119            test_display_name = "{}_{}bit".format(str(testcase), n_bit)
120
121            # Check runner's base_test filtering method
122            try:
123                self._filter_func(test_display_name)
124            except:
125                logging.info("[Parser] Skipping test case %s. Reason: "
126                             "filtered" % testcase.fullname)
127                testcase.is_filtered = True
128                testcase.note = "filtered"
129
130            # For skipping tests that are not designed or ready for Android,
131            # check for bit specific test in disabled list as well as non-bit specific
132            if ((self._ltp_tests_filter.IsInExcludeFilter(str(testcase)) or
133                 self._ltp_tests_filter.IsInExcludeFilter(test_display_name)) and
134                    not test_filter.IsInIncludeFilter(test_display_name)):
135                logging.info("[Parser] Skipping test case %s. Reason: "
136                             "disabled" % testcase.fullname)
137                continue
138
139            # For separating staging tests from stable tests
140            if not self._ltp_tests_filter.IsInIncludeFilter(test_display_name):
141                if not run_staging and not test_filter.IsInIncludeFilter(
142                        test_display_name):
143                    # Skip staging tests in stable run
144                    continue
145                else:
146                    testcase.is_staging = True
147                    testcase.note = "staging"
148            else:
149                if run_staging:
150                    # Skip stable tests in staging run
151                    continue
152
153            if not testcase.is_staging:
154                for x in stable_tests.STABLE_TESTS:
155                    if x[0] == test_display_name and x[1]:
156                        testcase.is_mandatory = True
157                        break
158
159            logging.info("[Parser] Adding test case %s." % testcase.fullname)
160            yield testcase
161
162    def ReadCommentedTxt(self, filepath):
163        '''Read a lines of a file that are not commented by #.
164
165        Args:
166            filepath: string, path of file to read
167
168        Returns:
169            A set of string representing non-commented lines in given file
170        '''
171        if not filepath:
172            logging.error('Invalid file path')
173            return None
174
175        with open(filepath, 'r') as f:
176            lines_gen = (line.strip() for line in f)
177            return set(
178                line for line in lines_gen
179                if line and not line.startswith('#'))
180
181    def GenerateLtpTestCases(self, testsuite, disabled_tests_list):
182        '''Generate test cases for each ltp test suite.
183
184        Args:
185            testsuite: string, test suite name
186
187        Returns:
188            A list of string
189        '''
190        testsuite_script = os.path.join(self._data_path,
191                                        ltp_configs.LTP_RUNTEST_DIR, testsuite)
192
193        result = []
194        for line in open(testsuite_script, 'r'):
195            line = line.strip()
196            if not line or line.startswith('#'):
197                continue
198
199            testname = line.split()[0]
200            testname_prefix = ('DISABLED_'
201                               if testname in disabled_tests_list else '')
202            testname_modified = testname_prefix + testname
203
204            result.append("\t".join(
205                [testsuite, testname_modified, line[len(testname):].strip()]))
206        return result
207
208    def GenerateLtpRunScript(self, scenario_groups, is_hwasan=False):
209        '''Given a scenario group generate test case script.
210
211        Args:
212            scenario_groups: list of string, name of test scenario groups to use
213
214        Returns:
215            A list of string
216        '''
217        disabled_tests_path = os.path.join(
218            self._data_path, ltp_configs.LTP_DISABLED_BUILD_TESTS_CONFIG_PATH)
219        disabled_tests_list = self.ReadCommentedTxt(disabled_tests_path)
220        if is_hwasan:
221          disabled_tests_list = disabled_tests_list.union(disabled_tests.DISABLED_TESTS_HWASAN)
222
223        result = []
224        for testsuite in scenario_groups:
225            result.extend(
226                self.GenerateLtpTestCases(testsuite, disabled_tests_list))
227
228        #TODO(yuexima): remove duplicate
229
230        return result
231