1#!/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import json
19import logging
20import os
21
22from vts.runners.host import asserts
23from vts.runners.host import base_test
24from vts.runners.host import const
25from vts.runners.host import keys
26from vts.runners.host import test_runner
27from vts.utils.python.controllers import adb
28from vts.utils.python.controllers import android_device
29from vts.utils.python.os import path_utils
30
31from vts.testcases.security.poc.host import poc_test_config as config
32
33class SecurityPoCKernelTest(base_test.BaseTestClass):
34    """Runs security PoC kernel test cases.
35
36    Attributes:
37        _dut: AndroidDevice, the device under test as config
38        _testcases: string list, list of testcases to run
39        _model: string, device model e.g. "Nexus 5X"
40        start_vts_agents: whether to start vts agents when registering new
41                          android devices.
42    """
43    start_vts_agents = False
44
45    def setUpClass(self):
46        """Creates device under test instance, and copies data files."""
47        required_params = [
48            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
49            keys.ConfigKeys.IKEY_ABI_BITNESS,
50            config.ConfigKeys.RUN_STAGING
51        ]
52        self.getUserParams(required_params)
53
54        logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
55                self.data_file_path)
56
57        self._dut = self.android_devices[0]
58        self._testcases = config.POC_TEST_CASES_STABLE
59        if self.run_staging:
60            self._testcases += config.POC_TEST_CASES_STAGING
61
62    def tearDownClass(self):
63        """Deletes all copied data."""
64        self._dut.adb.shell("rm -rf %s" % config.POC_TEST_DIR)
65
66    def PushFiles(self):
67        """adb pushes related file to target."""
68        self._dut.adb.shell("mkdir %s -p" % config.POC_TEST_DIR)
69
70        bitness = getattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS)
71        bitness_suffix = "64" if bitness == "64" else ""
72        native_test_dir = "nativetest{0}".format(bitness_suffix)
73        push_src = os.path.join(self.data_file_path, "DATA", native_test_dir,
74                                "security", "poc", ".")
75        self._dut.adb.push("%s %s" % (push_src, config.POC_TEST_DIR))
76
77    def CreateHostInput(self, testcase):
78        """Gathers information that will be passed to target-side code.
79
80        Args:
81            testcase: string, format testsuite/testname, specifies which
82                test case to examine.
83
84        Returns:
85            dict, information passed to native PoC test, contains info collected
86                from device and config. If None, poc should be skipped.
87        """
88        out = self._dut.adb.shell("getprop ro.product.model")
89        device_model = out.strip()
90        testcase_path = os.path.join(*testcase.split("/"))
91        test_config_path = os.path.join(
92            self.data_file_path, "vts", "testcases", "security", "poc",
93            "target", testcase_path, "poc.runner_conf")
94
95        with open(test_config_path) as test_config_file:
96            poc_config = json.load(test_config_file)["target_models"]
97
98            # If dut model is not in the test config, test should be skipped.
99            if not device_model in poc_config.keys():
100                return None
101
102            params = poc_config.get("default", {})
103            params.update(poc_config[device_model])
104
105        host_input = {
106            "device_model": device_model,
107            "params": params
108        }
109
110        return host_input
111
112    def CreateTestFlags(self, host_input):
113        """Packs host input info into command line flags.
114
115        Args:
116            host_input: dict, information passed to native PoC test.
117
118        Returns:
119            string, host_input packed into command-line flags.
120        """
121        device_model_flag = "--device_model=\"%s\"" % host_input["device_model"]
122
123        params = ["%s=%s" % (k, v) for k, v in host_input["params"].items()]
124        params = ",".join(params)
125        params_flag = "--params=\"%s\"" % params
126
127        test_flags = [device_model_flag, params_flag]
128        return " ".join(test_flags)
129
130    def RunTestcase(self, testcase):
131        """Runs the given testcase and asserts the result.
132
133        Args:
134            testcase: string, format testsuite/testname, specifies which
135                test case to run.
136        """
137        host_input = self.CreateHostInput(testcase)
138        asserts.skipIf(not host_input,
139                "%s not configured to run against this target model." % testcase)
140
141        items = testcase.split("/", 1)
142        testsuite = items[0]
143
144        chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath(
145            config.POC_TEST_DIR, testsuite)
146        logging.info("Executing: %s", chmod_cmd)
147        self._dut.adb.shell(chmod_cmd)
148
149        test_flags = self.CreateTestFlags(host_input)
150        test_cmd = "%s %s" % (
151            path_utils.JoinTargetPath(config.POC_TEST_DIR, testcase),
152            test_flags)
153        logging.info("Executing: %s", test_cmd)
154
155        try:
156            stdout = self._dut.adb.shell(test_cmd)
157            result = {
158                const.STDOUT: stdout,
159                const.STDERR: "",
160                const.EXIT_CODE: 0
161            }
162        except adb.AdbError as e:
163            result = {
164                const.STDOUT: e.stdout,
165                const.STDERR: e.stderr,
166                const.EXIT_CODE: e.ret_code
167            }
168        logging.info("Test results:\n%s", result)
169
170        self.AssertTestResult(result)
171
172    def AssertTestResult(self, result):
173        """Asserts that testcase finished as expected.
174
175        Checks that device is in responsive state. If not, waits for boot
176        then reports test as failure. If it is, asserts that all test commands
177        returned exit code 0.
178
179        Args:
180            result: dict(str, str, int), stdout, stderr and return code
181                from test run.
182        """
183        if self._dut.hasBooted():
184            exit_code = result[const.EXIT_CODE]
185            asserts.skipIf(exit_code == config.ExitCode.POC_TEST_SKIP,
186                    "Test case was skipped.")
187            asserts.assertFalse(exit_code == config.ExitCode.POC_TEST_FAIL,
188                    "Test case failed.")
189        else:
190            self._dut.waitForBootCompletion()
191            self._dut.rootAdb()
192            self.PushFiles()
193            asserts.fail("Test case left the device in unresponsive state.")
194
195    def generateSecurityPoCTests(self):
196        """Runs security PoC tests."""
197        self.PushFiles()
198        self.runGeneratedTests(
199            test_func=self.RunTestcase,
200            settings=self._testcases,
201            name_func=lambda x: x.replace('/','_'))
202
203if __name__ == "__main__":
204    test_runner.main()
205