1#!/usr/bin/env python
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import random
19import logging
20from concurrent import futures
21import queue
22import threading
23import os
24
25from vts.runners.host import asserts
26from vts.runners.host import base_test
27from vts.runners.host import const
28from vts.runners.host import keys
29from vts.runners.host import records
30from vts.runners.host import test_runner
31from vts.utils.python.common import cmd_utils
32from vts.utils.python.common import list_utils
33
34from vts.testcases.kernel.ltp import test_cases_parser
35from vts.testcases.kernel.ltp import environment_requirement_checker as env_checker
36from vts.testcases.kernel.ltp.shell_environment import shell_environment
37from vts.testcases.kernel.ltp import ltp_enums
38from vts.testcases.kernel.ltp import ltp_configs
39
40RANDOM_SEED = 0
41# TCP connection timeout
42TIMEOUT_TCP_IN_SECS = 180
43
44
45
46class KernelLtpTest(base_test.BaseTestClass):
47    """Runs the LTP (Linux Test Project) test cases against Android OS kernel.
48
49    Attributes:
50        _dut: AndroidDevice, the device under test
51        _shell: ShellMirrorObject, shell mirror object used to execute commands
52        _testcases: TestcasesParser, test case input parser
53        _env: dict<stirng, string>, dict of environment variable key value pair
54        data_file_path: string, runner's directory where test cases are stored
55        run_staging: bool, whether to run staging tests
56        number_of_threads: int, number of threads to run in parallel. If this
57                           number is set to 0, the test case will automatically
58                           pick the number of available CPUs on device. If
59                           the number is less than 0, it will be set to 1. If
60                           the number is greater than 0, that number of threads
61                           will be created to run the tests.
62    """
63    _32BIT = 32
64    _64BIT = 64
65    _PASS = 0
66    _SKIP = 1
67    _FAIL = -1
68
69    def setUpClass(self):
70        """Creates a remote shell instance, and copies data files."""
71        required_params = [
72            keys.ConfigKeys.IKEY_DATA_FILE_PATH, keys.ConfigKeys.KEY_TEST_SUITE
73        ]
74        self.getUserParams(required_params)
75
76        self.run_32bit = self.getUserParam(
77            ltp_enums.ConfigKeys.RUN_32BIT, default_value=True)
78        self.run_64bit = self.getUserParam(
79            ltp_enums.ConfigKeys.RUN_64BIT, default_value=True)
80        self.run_staging = self.getUserParam(
81            ltp_enums.ConfigKeys.RUN_STAGING, default_value=False)
82
83        logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
84                     self.data_file_path)
85        logging.info("%s: %s", keys.ConfigKeys.KEY_TEST_SUITE, self.test_suite)
86        logging.info("%s: %s", ltp_enums.ConfigKeys.RUN_STAGING,
87                     self.run_staging),
88
89        self.number_of_threads = self.getUserParam(
90            ltp_enums.ConfigKeys.LTP_NUMBER_OF_THREADS,
91            default_value=ltp_configs.DEFAULT_NUMBER_OF_THREADS)
92        logging.info("%s: %s", ltp_enums.ConfigKeys.LTP_NUMBER_OF_THREADS,
93                     self.number_of_threads)
94
95        self._dut = self.android_devices[0]
96        logging.info("product_type: %s", self._dut.product_type)
97        self.shell = self._dut.shell
98        self.shell.SetConnTimeout(TIMEOUT_TCP_IN_SECS)
99
100        self._requirement = env_checker.EnvironmentRequirementChecker(
101            self.shell)
102        self._shell_env = shell_environment.ShellEnvironment(self.shell)
103
104        self._testcases = test_cases_parser.TestCasesParser(
105            self.data_file_path, self.filterOneTest)
106
107        self._env = {
108            ltp_enums.ShellEnvKeys.TMP: ltp_configs.TMP,
109            ltp_enums.ShellEnvKeys.TMPBASE: ltp_configs.TMPBASE,
110            ltp_enums.ShellEnvKeys.LTPTMP: ltp_configs.LTPTMP,
111            ltp_enums.ShellEnvKeys.TMPDIR: ltp_configs.TMPDIR,
112            ltp_enums.ShellEnvKeys.LTP_DEV_FS_TYPE:
113            ltp_configs.LTP_DEV_FS_TYPE,
114            ltp_enums.ShellEnvKeys.LTPROOT: ltp_configs.LTPDIR,
115            ltp_enums.ShellEnvKeys.PATH: ltp_configs.PATH
116        }
117
118    @property
119    def shell(self):
120        """returns an object that can execute a shell command"""
121        return self._shell
122
123    @shell.setter
124    def shell(self, shell):
125        """Set shell object"""
126        self._shell = shell
127
128    def PreTestSetup(self, test_bit):
129        """Setups that needs to be done before any tests."""
130        replacements = {
131            '#!/bin/sh': '#!/system/bin/sh',
132            '#! /bin/sh': '#!/system/bin/sh',
133            '#!/bin/bash': '#!/system/bin/sh',
134            '#! /bin/bash': '#!/system/bin/sh',
135            'bs=1M': 'bs=1m',
136            '/var/run': ltp_configs.TMP
137        }
138        src_host = os.path.join(self.data_file_path, 'DATA', test_bit, 'ltp')
139
140        count = 0
141        for (dirpath, dirnames, filenames) in os.walk(src_host):
142            for filename in filenames:
143                filepath = os.path.join(dirpath, filename)
144                content = ''
145                with open(filepath, 'rb') as f:
146                    content = f.read()
147                content_replaced = content
148                for key in replacements:
149                    content_replaced = content_replaced.replace(
150                        key, replacements[key])
151                if content_replaced != content:
152                    with open(filepath, 'wb') as f:
153                        f.write(content_replaced)
154                    count += 1
155        logging.info('Finished replacing script contents from %s files', count)
156
157        self._report_thread_lock = threading.Lock()
158
159    def PushFiles(self, test_bit):
160        """Push the related files to target.
161
162        Args:
163            test_bit: nativetest or nativetest64
164        """
165        src = os.path.join(self.data_file_path, 'DATA', test_bit, 'ltp', '.')
166        logging.info('Pushing files from %s to %s', src, ltp_configs.LTPDIR)
167        self.shell.Execute("mkdir %s -p" % ltp_configs.LTPDIR)
168        self.shell.Execute("restorecon -F -R %s" % ltp_configs.LTPDIR)
169        self._dut.adb.push(src, ltp_configs.LTPDIR)
170        logging.info('finished pushing files from %s to %s', src,
171                     ltp_configs.LTPDIR)
172
173    def GetEnvp(self):
174        """Generate the environment variable required to run the tests."""
175        return ' '.join("%s=%s" % (key, value)
176                        for key, value in self._env.items())
177
178    def tearDownClass(self):
179        """Deletes all copied data files."""
180        self.shell.Execute("rm -rf %s" % ltp_configs.LTPDIR)
181        self._requirement.Cleanup()
182
183    def Verify(self, test_case, results):
184        """Interpret the test result of each test case.
185
186        Returns:
187            tuple(int, string), a tuple of int which represents test pass, fail
188            or skip, and string representing the reason of a failed or skipped
189            test
190        """
191        if not results:
192            return (self._FAIL, "No response received. Socket timeout")
193
194        if None in results.values():
195            return (self._FAIL, "Command result is empty.")
196
197        # For LTP test cases, we run one shell command for each test case
198        # So the result should also contains only one execution output
199        try:
200            stdout = results[const.STDOUT][0]
201            ret_code = results[const.EXIT_CODE][0]
202        except IndexError as e:
203            logging.exception(e)
204            return (self._FAIL, "Command result is malformed.")
205
206        if (ret_code == ltp_enums.TestExitCode.TCONF and
207            not test_case.is_mandatory):
208            return (self._SKIP, "Incompatible test skipped: TCONF")
209        elif (ret_code != ltp_enums.TestExitCode.TPASS):
210            return (self._FAIL,
211                    "Got return code %s, test did not pass." % ret_code)
212        else:
213            return (self._PASS, None)
214
215    def CheckResult(self, test_case, cmd_results, result=None, note=None):
216        """Check a test result and emit exceptions if test failed or skipped.
217
218        If the shell command result is not yet interpreted, self.Verify will
219        be called to interpret the results.
220
221        Args:
222            test_case: test case object for test that gave the result
223            cmd_results: dict([str],[str],[int]), command results from shell.
224            result: int, which is one of the values of _PASS, _SKIP, and _FAIL
225            note: string, reason why a test failed or get skipped
226        """
227        asserts.assertTrue(cmd_results, "No response received. Socket timeout")
228
229        logging.info("stdout: %s", cmd_results[const.STDOUT])
230        logging.info("stderr: %s", cmd_results[const.STDERR])
231        logging.info("exit_code: %s", cmd_results[const.EXIT_CODE])
232
233        if result is None:
234            result, note = self.Verify(test_case, cmd_results)
235        logging.info("verify result: %s", result)
236        logging.info("note: %s", note)
237
238        asserts.skipIf(result == self._SKIP, note)
239        asserts.assertEqual(result, self._PASS, note)
240
241    def TestNBits(self, n_bit):
242        """Runs all 32-bit or 64-bit LTP test cases.
243
244        Args:
245            n_bit: int, bitness
246        """
247        test_bit = 'nativetest'
248        if n_bit == self._64BIT:
249            test_bit += '64'
250        self.PreTestSetup(test_bit)
251        self.PushFiles(test_bit)
252
253        is_low_mem = self._dut.getProp('ro.config.low_ram').lower() == 'true'
254        if is_low_mem:
255            logging.info('Device is configured as a low RAM device.')
256
257        is_hwasan = self._dut.getProp('ro.product.name').find('_hwasan') != -1 and n_bit == self._64BIT
258        if is_hwasan:
259            logging.info('Running on a HWASan device.')
260
261        test_cases = list(
262            self._testcases.Load(
263                ltp_configs.LTPDIR,
264                n_bit,
265                self.test_filter,
266                run_staging=self.run_staging,
267                is_low_mem=is_low_mem,
268                is_hwasan=is_hwasan))
269
270        logging.info("Checking binary exists for all test cases.")
271        self._requirement.ltp_bin_host_path = os.path.join(
272            self.data_file_path, 'DATA', test_bit, 'ltp', 'testcases', 'bin')
273        self._requirement.CheckAllTestCaseExecutables(test_cases)
274        logging.info("Start running %i individual tests." % len(test_cases))
275
276        self.RunGeneratedTestsMultiThread(
277            test_func=self.RunLtpOnce,
278            settings=test_cases,
279            args=(n_bit, ),
280            name_func=self.GetTestName)
281
282    def RunGeneratedTestsMultiThread(self, test_func, settings, args,
283                                     name_func):
284        """Run LTP tests with multi-threads.
285
286        If number_of_thread is specified to be 0 in config file, a shell query
287        will be made to the device to get the number of available CPUs. If
288        number_of_thread or number of CPUs available is 1, this function will
289        call and return parent class's regular runGeneratedTest function. Since
290        some tests may be competing resources with each other, all the failed
291        tests will be rerun sequentially in the end to confirm their failure.
292        Also, if include_filter is not empty, only 1 thread will be running.
293
294        Args:
295            test_func: The common logic shared by all these generated test
296                       cases. This function should take at least one argument,
297                       which is a parameter set.
298            settings: A list of strings representing parameter sets. These are
299                      usually json strings that get loaded in the test_func.
300            args: Iterable of additional position args to be passed to
301                  test_func.
302            name_func: A function that takes a test setting and generates a
303                       proper test name.
304
305        Returns:
306            A list of settings that fail.
307        """
308        n_workers = self.number_of_threads
309
310        if n_workers < 0:
311            logging.error('invalid setting for number of threads: < 0.')
312            n_workers = 1
313
314        # Include filter is not empty; Run in sequential.
315        if self.test_filter.include_filter:
316            n_workers = 1
317
318        # Number of thread is set to 0 (automatic)
319        if not n_workers:
320            n_workers = self._shell_env.GetDeviceNumberOfPresentCpu()
321            logging.info('Number of CPU available on device: %i', n_workers)
322
323        # Skip multithread version if only 1 worker available
324        if n_workers == 1:
325            return self.runGeneratedTests(
326                test_func=test_func,
327                settings=settings,
328                args=args,
329                name_func=name_func)
330
331        settings_multithread = []
332        settings_singlethread = []
333        for test_case in settings:
334            if (test_case.is_staging or test_case.testsuite in
335                    ltp_configs.TEST_SUITES_REQUIRE_SINGLE_THREAD_MODE):
336                settings_singlethread.append(test_case)
337            else:
338                settings_multithread.append(test_case)
339
340        failed_tests = self.runGeneratedTests(
341            test_func=test_func,
342            settings=settings_singlethread,
343            args=args,
344            name_func=name_func)
345
346        # Shuffle the tests to reduce resource competition probability
347        random.seed(RANDOM_SEED)
348        random.shuffle(settings_multithread)
349
350        # Create a queue for thread workers to pull tasks
351        q = queue.Queue()
352        map(q.put, settings_multithread)
353
354        # Create individual shell sessions for thread workers
355        for i in xrange(n_workers):
356            self._dut.shell.InvokeTerminal("shell_thread_{}".format(i))
357
358        failed_multithread_tests = set()
359        with futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
360            fs = [
361                executor.submit(self.RunLtpWorker, q, args, name_func, i)
362                for i in xrange(n_workers)
363            ]
364
365            failed_test_sets = map(futures.Future.result, fs)
366            for failed_test_set in failed_test_sets:
367                for test_case in failed_test_set:
368                    failed_multithread_tests.add(test_case)
369
370        for test_case in failed_multithread_tests:
371            logging.info(
372                "Test case %s failed during multi-thread run, rerunning...",
373                test_case)
374
375        # In the end, rerun all failed tests to confirm their failure
376        # in sequential.
377        failed_tests.extend(
378            self.runGeneratedTests(
379                test_func=test_func,
380                settings=failed_multithread_tests,
381                args=args,
382                name_func=name_func))
383
384        return failed_tests
385
386    def RunLtpWorker(self, testcases, args, name_func, id):
387        """Worker thread to run a LTP test case at a time."""
388        shell = getattr(self._dut.shell, "shell_thread_{}".format(id))
389        shell.SetConnTimeout(TIMEOUT_TCP_IN_SECS)
390        failed_tests = set()
391
392        while True:
393            test_case = None
394            try:
395                test_case = testcases.get(block=False)
396                logging.info("Worker {} takes '{}'.".format(id, test_case))
397            except:
398                logging.info("Worker {} finished.".format(id))
399                return failed_tests
400
401            test_name = name_func(test_case, *args)
402
403            # Check whether test case is filtered out by base_test's filtering method
404            if test_case.is_filtered:
405                self.InternalResultReportMultiThread(test_name, asserts.skipIf,
406                                                     (True, test_case.note))
407                continue
408            logging.info("Worker {} starts checking requirement "
409                         "for '{}'.".format(id, test_case))
410
411            # Check test requirements
412            requirement_satisfied = self._requirement.Check(test_case)
413            if not requirement_satisfied:
414                logging.info("Worker {} reports requirement "
415                             "not satisfied for '{}'.".format(id, test_case))
416                self.InternalResultReportMultiThread(test_name, asserts.skipIf,
417                                                     (True, test_case.note))
418                continue
419
420            cmd = "export {envp} && cd {cwd} && {commands}".format(
421                envp=self.GetEnvp(),
422                cwd=ltp_configs.LTPBINPATH,
423                commands=test_case.command)
424
425            logging.info("Worker {} starts executing command "
426                         "for '{}'.\n  Command:{}".format(id, test_case, cmd))
427            cmd_results = shell.Execute(cmd)
428
429            logging.info("Worker {} starts verifying results "
430                         "for '{}'.".format(id, test_case))
431
432            result, note = self.Verify(test_case, cmd_results)
433            if result == self._FAIL:
434                # Hide failed tests from the runner and put into rerun list
435                logging.info("Worker {} reports '{}' failed. Adding to "
436                             "sequential job queue.".format(id, test_case))
437                failed_tests.add(test_case)
438            else:
439                # Report skipped or passed tests to runner
440                self.InternalResultReportMultiThread(
441                    test_name, self.CheckResult,
442                    (test_case, cmd_results, result, note))
443
444    def InternalResultReportMultiThread(self, test_name, function, args,
445                                        **kwargs):
446        """Report a test result to runner thread safely.
447
448        Run the given function to generate result for the runner. The function
449        given should produce the same result visible to the runner but may not
450        run any actual tests.
451
452        Args:
453            test_name: string, name of a test case
454            function: the function to generate a test case result for runner
455            args: any arguments for the function
456            **kwargs: any additional keyword arguments for runner
457        """
458        self._report_thread_lock.acquire()
459        tr_record = records.TestResultRecord(test_name, self.test_module_name)
460        self.results.requested.append(tr_record)
461        try:
462            self.execOneTest(test_name, function, args, **kwargs)
463        except Exception as e:
464            raise e
465        finally:
466            self._report_thread_lock.release()
467
468    def GetTestName(self, test_case, n_bit):
469        "Generate the vts test name of a ltp test"
470        return "{}_{}bit".format(test_case, n_bit)
471
472    def RunLtpOnce(self, test_case, n_bit):
473        "Run one LTP test case"
474        asserts.skipIf(test_case.is_filtered, test_case.note)
475        asserts.skipIf(not self._requirement.Check(test_case), test_case.note)
476
477        cmd = "export {envp} && cd {cwd} && {commands}".format(
478            envp=self.GetEnvp(),
479            cwd=ltp_configs.LTPBINPATH,
480            commands=test_case.command)
481        logging.info("Executing %s", cmd)
482        self.CheckResult(test_case, self.shell.Execute(cmd))
483
484    def generate64BitTests(self):
485        """Runs all 64-bit LTP test cases."""
486        if not self.run_64bit:
487            logging.info('User specified not to run 64 bit version LTP tests.')
488            return
489        if not self._dut.is64Bit:
490            logging.info('Target device does not support 64 bit tests.')
491            return
492        if self.abi_bitness != None and self.abi_bitness != '64':
493            logging.info('Skipped 64 bit tests on %s bit ABI.',
494                         self.abi_bitness)
495            return
496
497        self.TestNBits(self._64BIT)
498
499    def generate32BitTests(self):
500        """Runs all 32-bit LTP test cases."""
501        if not self.run_32bit:
502            logging.info('User specified not to run 32 bit version LTP tests.')
503            return
504        if self.abi_bitness != None and self.abi_bitness != '32':
505            logging.info('Skipped 32 bit tests on %s bit ABI.',
506                         self.abi_bitness)
507            return
508
509        self.TestNBits(self._32BIT)
510
511
512if __name__ == "__main__":
513    test_runner.main()
514