1#!/usr/bin/env python
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import random
19import logging
20from concurrent import futures
21import queue
22import threading
23import os
24
25from vts.runners.host import asserts
26from vts.runners.host import base_test
27from vts.runners.host import const
28from vts.runners.host import keys
29from vts.runners.host import records
30from vts.runners.host import test_runner
31from vts.utils.python.common import cmd_utils
32from vts.utils.python.common import list_utils
33
34from vts.testcases.kernel.ltp import test_cases_parser
35from vts.testcases.kernel.ltp import environment_requirement_checker as env_checker
36from vts.testcases.kernel.ltp.shell_environment import shell_environment
37from vts.testcases.kernel.ltp import ltp_enums
38from vts.testcases.kernel.ltp import ltp_configs
39
40RANDOM_SEED = 0
41# TCP connection timeout
42TIMEOUT_TCP_IN_SECS = 180
43
44
45
46class KernelLtpTest(base_test.BaseTestClass):
47    """Runs the LTP (Linux Test Project) test cases against Android OS kernel.
48
49    Attributes:
50        _dut: AndroidDevice, the device under test
51        _shell: ShellMirrorObject, shell mirror object used to execute commands
52        _testcases: TestcasesParser, test case input parser
53        _env: dict<stirng, string>, dict of environment variable key value pair
54        data_file_path: string, runner's directory where test cases are stored
55        run_staging: bool, whether to run staging tests
56        number_of_threads: int, number of threads to run in parallel. If this
57                           number is set to 0, the test case will automatically
58                           pick the number of available CPUs on device. If
59                           the number is less than 0, it will be set to 1. If
60                           the number is greater than 0, that number of threads
61                           will be created to run the tests.
62    """
63    _32BIT = 32
64    _64BIT = 64
65    _PASS = 0
66    _SKIP = 1
67    _FAIL = -1
68
69    def setUpClass(self):
70        """Creates a remote shell instance, and copies data files."""
71        required_params = [
72            keys.ConfigKeys.IKEY_DATA_FILE_PATH, keys.ConfigKeys.KEY_TEST_SUITE
73        ]
74        self.getUserParams(required_params)
75
76        self.run_32bit = self.getUserParam(
77            ltp_enums.ConfigKeys.RUN_32BIT, default_value=True)
78        self.run_64bit = self.getUserParam(
79            ltp_enums.ConfigKeys.RUN_64BIT, default_value=True)
80        self.run_staging = self.getUserParam(
81            ltp_enums.ConfigKeys.RUN_STAGING, default_value=False)
82
83        logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
84                     self.data_file_path)
85        logging.info("%s: %s", keys.ConfigKeys.KEY_TEST_SUITE, self.test_suite)
86        logging.info("%s: %s", ltp_enums.ConfigKeys.RUN_STAGING,
87                     self.run_staging),
88
89        self.number_of_threads = self.getUserParam(
90            ltp_enums.ConfigKeys.LTP_NUMBER_OF_THREADS,
91            default_value=ltp_configs.DEFAULT_NUMBER_OF_THREADS)
92        logging.info("%s: %s", ltp_enums.ConfigKeys.LTP_NUMBER_OF_THREADS,
93                     self.number_of_threads)
94
95        self._dut = self.android_devices[0]
96        logging.info("product_type: %s", self._dut.product_type)
97        self.shell = self._dut.shell
98        self.shell.SetConnTimeout(TIMEOUT_TCP_IN_SECS)
99
100        self._requirement = env_checker.EnvironmentRequirementChecker(
101            self.shell)
102        self._shell_env = shell_environment.ShellEnvironment(self.shell)
103
104        self._testcases = test_cases_parser.TestCasesParser(
105            self.data_file_path, self.filterOneTest)
106
107        self._env = {
108            ltp_enums.ShellEnvKeys.TMP: ltp_configs.TMP,
109            ltp_enums.ShellEnvKeys.TMPBASE: ltp_configs.TMPBASE,
110            ltp_enums.ShellEnvKeys.LTPTMP: ltp_configs.LTPTMP,
111            ltp_enums.ShellEnvKeys.TMPDIR: ltp_configs.TMPDIR,
112            ltp_enums.ShellEnvKeys.LTP_DEV_FS_TYPE:
113            ltp_configs.LTP_DEV_FS_TYPE,
114            ltp_enums.ShellEnvKeys.LTPROOT: ltp_configs.LTPDIR,
115            ltp_enums.ShellEnvKeys.PATH: ltp_configs.PATH
116        }
117
118    @property
119    def shell(self):
120        """returns an object that can execute a shell command"""
121        return self._shell
122
123    @shell.setter
124    def shell(self, shell):
125        """Set shell object"""
126        self._shell = shell
127
128    def PreTestSetup(self, test_bit):
129        """Setups that needs to be done before any tests."""
130        replacements = {
131            '#!/bin/sh': '#!/system/bin/sh',
132            '#! /bin/sh': '#!/system/bin/sh',
133            '#!/bin/bash': '#!/system/bin/sh',
134            '#! /bin/bash': '#!/system/bin/sh',
135            'bs=1M': 'bs=1m',
136            '/var/run': ltp_configs.TMP
137        }
138        src_host = os.path.join(self.data_file_path, 'DATA', test_bit, 'ltp')
139
140        count = 0
141        for (dirpath, dirnames, filenames) in os.walk(src_host):
142            for filename in filenames:
143                filepath = os.path.join(dirpath, filename)
144                content = ''
145                with open(filepath, 'rb') as f:
146                    content = f.read()
147                content_replaced = content
148                for key in replacements:
149                    content_replaced = content_replaced.replace(
150                        key, replacements[key])
151                if content_replaced != content:
152                    with open(filepath, 'wb') as f:
153                        f.write(content_replaced)
154                    count += 1
155        logging.info('Finished replacing script contents from %s files', count)
156
157        self._report_thread_lock = threading.Lock()
158
159    def PushFiles(self, test_bit):
160        """Push the related files to target.
161
162        Args:
163            test_bit: nativetest or nativetest64
164        """
165        src = os.path.join(self.data_file_path, 'DATA', test_bit, 'ltp', '.')
166        logging.info('Pushing files from %s to %s', src, ltp_configs.LTPDIR)
167        self.shell.Execute("mkdir %s -p" % ltp_configs.LTPDIR)
168        self.shell.Execute("restorecon -F -R %s" % ltp_configs.LTPDIR)
169        self._dut.adb.push(src, ltp_configs.LTPDIR)
170        logging.info('finished pushing files from %s to %s', src,
171                     ltp_configs.LTPDIR)
172
173    def GetEnvp(self):
174        """Generate the environment variable required to run the tests."""
175        return ' '.join("%s=%s" % (key, value)
176                        for key, value in self._env.items())
177
178    def tearDownClass(self):
179        """Deletes all copied data files."""
180        self.shell.Execute("rm -rf %s" % ltp_configs.LTPDIR)
181        self._requirement.Cleanup()
182
183    def Verify(self, test_case, results):
184        """Interpret the test result of each test case.
185
186        Returns:
187            tuple(int, string), a tuple of int which represents test pass, fail
188            or skip, and string representing the reason of a failed or skipped
189            test
190        """
191        if not results:
192            return (self._FAIL, "No response received. Socket timeout")
193
194        if None in results.values():
195            return (self._FAIL, "Command result is empty.")
196
197        # For LTP test cases, we run one shell command for each test case
198        # So the result should also contains only one execution output
199        try:
200            stdout = results[const.STDOUT][0]
201            ret_code = results[const.EXIT_CODE][0]
202        except IndexError as e:
203            logging.exception(e)
204            return (self._FAIL, "Command result is malformed.")
205
206        if (ret_code == ltp_enums.TestExitCode.TCONF and
207            not test_case.is_mandatory):
208            return (self._SKIP, "Incompatible test skipped: TCONF")
209        elif (ret_code != ltp_enums.TestExitCode.TPASS):
210            return (self._FAIL,
211                    "Got return code %s, test did not pass." % ret_code)
212        else:
213            return (self._PASS, None)
214
215    def CheckResult(self, test_case, cmd_results, result=None, note=None):
216        """Check a test result and emit exceptions if test failed or skipped.
217
218        If the shell command result is not yet interpreted, self.Verify will
219        be called to interpret the results.
220
221        Args:
222            test_case: test case object for test that gave the result
223            cmd_results: dict([str],[str],[int]), command results from shell.
224            result: int, which is one of the values of _PASS, _SKIP, and _FAIL
225            note: string, reason why a test failed or get skipped
226        """
227        asserts.assertTrue(cmd_results, "No response received. Socket timeout")
228
229        logging.info("stdout: %s", cmd_results[const.STDOUT])
230        logging.info("stderr: %s", cmd_results[const.STDERR])
231        logging.info("exit_code: %s", cmd_results[const.EXIT_CODE])
232
233        if result is None:
234            result, note = self.Verify(test_case, cmd_results)
235        logging.info("verify result: %s", result)
236        logging.info("note: %s", note)
237
238        asserts.skipIf(result == self._SKIP, note)
239        asserts.assertEqual(result, self._PASS, note)
240
241    def TestNBits(self, n_bit):
242        """Runs all 32-bit or 64-bit LTP test cases.
243
244        Args:
245            n_bit: int, bitness
246        """
247        test_bit = 'nativetest'
248        if n_bit == self._64BIT:
249            test_bit += '64'
250        self.PreTestSetup(test_bit)
251        self.PushFiles(test_bit)
252
253        is_low_mem = self._dut.getProp('ro.config.low_ram').lower() == 'true'
254        if is_low_mem:
255            logging.info('Device is configured as a low RAM device.')
256
257        test_cases = list(
258            self._testcases.Load(
259                ltp_configs.LTPDIR,
260                n_bit,
261                self.test_filter,
262                run_staging=self.run_staging,
263                is_low_mem=is_low_mem))
264
265        logging.info("Checking binary exists for all test cases.")
266        self._requirement.ltp_bin_host_path = os.path.join(
267            self.data_file_path, 'DATA', test_bit, 'ltp', 'testcases', 'bin')
268        self._requirement.CheckAllTestCaseExecutables(test_cases)
269        logging.info("Start running %i individual tests." % len(test_cases))
270
271        self.RunGeneratedTestsMultiThread(
272            test_func=self.RunLtpOnce,
273            settings=test_cases,
274            args=(n_bit, ),
275            name_func=self.GetTestName)
276
277    def RunGeneratedTestsMultiThread(self, test_func, settings, args,
278                                     name_func):
279        """Run LTP tests with multi-threads.
280
281        If number_of_thread is specified to be 0 in config file, a shell query
282        will be made to the device to get the number of available CPUs. If
283        number_of_thread or number of CPUs available is 1, this function will
284        call and return parent class's regular runGeneratedTest function. Since
285        some tests may be competing resources with each other, all the failed
286        tests will be rerun sequentially in the end to confirm their failure.
287        Also, if include_filter is not empty, only 1 thread will be running.
288
289        Args:
290            test_func: The common logic shared by all these generated test
291                       cases. This function should take at least one argument,
292                       which is a parameter set.
293            settings: A list of strings representing parameter sets. These are
294                      usually json strings that get loaded in the test_func.
295            args: Iterable of additional position args to be passed to
296                  test_func.
297            name_func: A function that takes a test setting and generates a
298                       proper test name.
299
300        Returns:
301            A list of settings that fail.
302        """
303        n_workers = self.number_of_threads
304
305        if n_workers < 0:
306            logging.error('invalid setting for number of threads: < 0.')
307            n_workers = 1
308
309        # Include filter is not empty; Run in sequential.
310        if self.test_filter.include_filter:
311            n_workers = 1
312
313        # Number of thread is set to 0 (automatic)
314        if not n_workers:
315            n_workers = self._shell_env.GetDeviceNumberOfPresentCpu()
316            logging.info('Number of CPU available on device: %i', n_workers)
317
318        # Skip multithread version if only 1 worker available
319        if n_workers == 1:
320            return self.runGeneratedTests(
321                test_func=test_func,
322                settings=settings,
323                args=args,
324                name_func=name_func)
325
326        settings_multithread = []
327        settings_singlethread = []
328        for test_case in settings:
329            if (test_case.is_staging or test_case.testsuite in
330                    ltp_configs.TEST_SUITES_REQUIRE_SINGLE_THREAD_MODE):
331                settings_singlethread.append(test_case)
332            else:
333                settings_multithread.append(test_case)
334
335        failed_tests = self.runGeneratedTests(
336            test_func=test_func,
337            settings=settings_singlethread,
338            args=args,
339            name_func=name_func)
340
341        # Shuffle the tests to reduce resource competition probability
342        random.seed(RANDOM_SEED)
343        random.shuffle(settings_multithread)
344
345        # Create a queue for thread workers to pull tasks
346        q = queue.Queue()
347        map(q.put, settings_multithread)
348
349        # Create individual shell sessions for thread workers
350        for i in xrange(n_workers):
351            self._dut.shell.InvokeTerminal("shell_thread_{}".format(i))
352
353        failed_multithread_tests = set()
354        with futures.ThreadPoolExecutor(max_workers=n_workers) as executor:
355            fs = [
356                executor.submit(self.RunLtpWorker, q, args, name_func, i)
357                for i in xrange(n_workers)
358            ]
359
360            failed_test_sets = map(futures.Future.result, fs)
361            for failed_test_set in failed_test_sets:
362                for test_case in failed_test_set:
363                    failed_multithread_tests.add(test_case)
364
365        for test_case in failed_multithread_tests:
366            logging.info(
367                "Test case %s failed during multi-thread run, rerunning...",
368                test_case)
369
370        # In the end, rerun all failed tests to confirm their failure
371        # in sequential.
372        failed_tests.extend(
373            self.runGeneratedTests(
374                test_func=test_func,
375                settings=failed_multithread_tests,
376                args=args,
377                name_func=name_func))
378
379        return failed_tests
380
381    def RunLtpWorker(self, testcases, args, name_func, id):
382        """Worker thread to run a LTP test case at a time."""
383        shell = getattr(self._dut.shell, "shell_thread_{}".format(id))
384        shell.SetConnTimeout(TIMEOUT_TCP_IN_SECS)
385        failed_tests = set()
386
387        while True:
388            test_case = None
389            try:
390                test_case = testcases.get(block=False)
391                logging.info("Worker {} takes '{}'.".format(id, test_case))
392            except:
393                logging.info("Worker {} finished.".format(id))
394                return failed_tests
395
396            test_name = name_func(test_case, *args)
397
398            # Check whether test case is filtered out by base_test's filtering method
399            if test_case.is_filtered:
400                self.InternalResultReportMultiThread(test_name, asserts.skipIf,
401                                                     (True, test_case.note))
402                continue
403            logging.info("Worker {} starts checking requirement "
404                         "for '{}'.".format(id, test_case))
405
406            # Check test requirements
407            requirement_satisfied = self._requirement.Check(test_case)
408            if not requirement_satisfied:
409                logging.info("Worker {} reports requirement "
410                             "not satisfied for '{}'.".format(id, test_case))
411                self.InternalResultReportMultiThread(test_name, asserts.skipIf,
412                                                     (True, test_case.note))
413                continue
414
415            cmd = "export {envp} && cd {cwd} && {commands}".format(
416                envp=self.GetEnvp(),
417                cwd=ltp_configs.LTPBINPATH,
418                commands=test_case.command)
419
420            logging.info("Worker {} starts executing command "
421                         "for '{}'.\n  Command:{}".format(id, test_case, cmd))
422            cmd_results = shell.Execute(cmd)
423
424            logging.info("Worker {} starts verifying results "
425                         "for '{}'.".format(id, test_case))
426
427            result, note = self.Verify(test_case, cmd_results)
428            if result == self._FAIL:
429                # Hide failed tests from the runner and put into rerun list
430                logging.info("Worker {} reports '{}' failed. Adding to "
431                             "sequential job queue.".format(id, test_case))
432                failed_tests.add(test_case)
433            else:
434                # Report skipped or passed tests to runner
435                self.InternalResultReportMultiThread(
436                    test_name, self.CheckResult,
437                    (test_case, cmd_results, result, note))
438
439    def InternalResultReportMultiThread(self, test_name, function, args,
440                                        **kwargs):
441        """Report a test result to runner thread safely.
442
443        Run the given function to generate result for the runner. The function
444        given should produce the same result visible to the runner but may not
445        run any actual tests.
446
447        Args:
448            test_name: string, name of a test case
449            function: the function to generate a test case result for runner
450            args: any arguments for the function
451            **kwargs: any additional keyword arguments for runner
452        """
453        self._report_thread_lock.acquire()
454        tr_record = records.TestResultRecord(test_name, self.test_module_name)
455        self.results.requested.append(tr_record)
456        try:
457            self.execOneTest(test_name, function, args, **kwargs)
458        except Exception as e:
459            raise e
460        finally:
461            self._report_thread_lock.release()
462
463    def GetTestName(self, test_case, n_bit):
464        "Generate the vts test name of a ltp test"
465        return "{}_{}bit".format(test_case, n_bit)
466
467    def RunLtpOnce(self, test_case, n_bit):
468        "Run one LTP test case"
469        asserts.skipIf(test_case.is_filtered, test_case.note)
470        asserts.skipIf(not self._requirement.Check(test_case), test_case.note)
471
472        cmd = "export {envp} && cd {cwd} && {commands}".format(
473            envp=self.GetEnvp(),
474            cwd=ltp_configs.LTPBINPATH,
475            commands=test_case.command)
476        logging.info("Executing %s", cmd)
477        self.CheckResult(test_case, self.shell.Execute(cmd))
478
479    def generate64BitTests(self):
480        """Runs all 64-bit LTP test cases."""
481        if not self.run_64bit:
482            logging.info('User specified not to run 64 bit version LTP tests.')
483            return
484        if not self._dut.is64Bit:
485            logging.info('Target device does not support 64 bit tests.')
486            return
487        if self.abi_bitness != None and self.abi_bitness != '64':
488            logging.info('Skipped 64 bit tests on %s bit ABI.',
489                         self.abi_bitness)
490            return
491
492        self.TestNBits(self._64BIT)
493
494    def generate32BitTests(self):
495        """Runs all 32-bit LTP test cases."""
496        if not self.run_32bit:
497            logging.info('User specified not to run 32 bit version LTP tests.')
498            return
499        if self.abi_bitness != None and self.abi_bitness != '32':
500            logging.info('Skipped 32 bit tests on %s bit ABI.',
501                         self.abi_bitness)
502            return
503
504        self.TestNBits(self._32BIT)
505
506
507if __name__ == "__main__":
508    test_runner.main()
509