1#!/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import json 19import logging 20import os 21 22from vts.runners.host import asserts 23from vts.runners.host import base_test 24from vts.runners.host import const 25from vts.runners.host import keys 26from vts.runners.host import test_runner 27from vts.utils.python.controllers import adb 28from vts.utils.python.controllers import android_device 29from vts.utils.python.os import path_utils 30 31from vts.testcases.security.poc.host import poc_test_config as config 32 33class SecurityPoCKernelTest(base_test.BaseTestClass): 34 """Runs security PoC kernel test cases. 35 36 Attributes: 37 _dut: AndroidDevice, the device under test as config 38 _testcases: string list, list of testcases to run 39 _model: string, device model e.g. "Nexus 5X" 40 start_vts_agents: whether to start vts agents when registering new 41 android devices. 42 """ 43 start_vts_agents = False 44 45 def setUpClass(self): 46 """Creates device under test instance, and copies data files.""" 47 required_params = [ 48 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 49 keys.ConfigKeys.IKEY_ABI_BITNESS, 50 config.ConfigKeys.RUN_STAGING 51 ] 52 self.getUserParams(required_params) 53 54 logging.info("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, 55 self.data_file_path) 56 57 self._dut = self.android_devices[0] 58 self._testcases = config.POC_TEST_CASES_STABLE 59 if self.run_staging: 60 self._testcases += config.POC_TEST_CASES_STAGING 61 62 def tearDownClass(self): 63 """Deletes all copied data.""" 64 self._dut.adb.shell("rm -rf %s" % config.POC_TEST_DIR) 65 66 def PushFiles(self): 67 """adb pushes related file to target.""" 68 self._dut.adb.shell("mkdir %s -p" % config.POC_TEST_DIR) 69 70 bitness = getattr(self, keys.ConfigKeys.IKEY_ABI_BITNESS) 71 bitness_suffix = "64" if bitness == "64" else "" 72 native_test_dir = "nativetest{0}".format(bitness_suffix) 73 push_src = os.path.join(self.data_file_path, "DATA", native_test_dir, 74 "security", "poc", ".") 75 self._dut.adb.push("%s %s" % (push_src, config.POC_TEST_DIR)) 76 77 def CreateHostInput(self, testcase): 78 """Gathers information that will be passed to target-side code. 79 80 Args: 81 testcase: string, format testsuite/testname, specifies which 82 test case to examine. 83 84 Returns: 85 dict, information passed to native PoC test, contains info collected 86 from device and config. If None, poc should be skipped. 87 """ 88 out = self._dut.adb.shell("getprop ro.product.model") 89 device_model = out.strip() 90 testcase_path = os.path.join(*testcase.split("/")) 91 test_config_path = os.path.join( 92 self.data_file_path, "vts", "testcases", "security", "poc", 93 "target", testcase_path, "poc.runner_conf") 94 95 with open(test_config_path) as test_config_file: 96 poc_config = json.load(test_config_file)["target_models"] 97 98 # If dut model is not in the test config, test should be skipped. 99 if not device_model in poc_config.keys(): 100 return None 101 102 params = poc_config.get("default", {}) 103 params.update(poc_config[device_model]) 104 105 host_input = { 106 "device_model": device_model, 107 "params": params 108 } 109 110 return host_input 111 112 def CreateTestFlags(self, host_input): 113 """Packs host input info into command line flags. 114 115 Args: 116 host_input: dict, information passed to native PoC test. 117 118 Returns: 119 string, host_input packed into command-line flags. 120 """ 121 device_model_flag = "--device_model=\"%s\"" % host_input["device_model"] 122 123 params = ["%s=%s" % (k, v) for k, v in host_input["params"].items()] 124 params = ",".join(params) 125 params_flag = "--params=\"%s\"" % params 126 127 test_flags = [device_model_flag, params_flag] 128 return " ".join(test_flags) 129 130 def RunTestcase(self, testcase): 131 """Runs the given testcase and asserts the result. 132 133 Args: 134 testcase: string, format testsuite/testname, specifies which 135 test case to run. 136 """ 137 host_input = self.CreateHostInput(testcase) 138 asserts.skipIf(not host_input, 139 "%s not configured to run against this target model." % testcase) 140 141 items = testcase.split("/", 1) 142 testsuite = items[0] 143 144 chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( 145 config.POC_TEST_DIR, testsuite) 146 logging.info("Executing: %s", chmod_cmd) 147 self._dut.adb.shell(chmod_cmd) 148 149 test_flags = self.CreateTestFlags(host_input) 150 test_cmd = "%s %s" % ( 151 path_utils.JoinTargetPath(config.POC_TEST_DIR, testcase), 152 test_flags) 153 logging.info("Executing: %s", test_cmd) 154 155 try: 156 stdout = self._dut.adb.shell(test_cmd) 157 result = { 158 const.STDOUT: stdout, 159 const.STDERR: "", 160 const.EXIT_CODE: 0 161 } 162 except adb.AdbError as e: 163 result = { 164 const.STDOUT: e.stdout, 165 const.STDERR: e.stderr, 166 const.EXIT_CODE: e.ret_code 167 } 168 logging.info("Test results:\n%s", result) 169 170 self.AssertTestResult(result) 171 172 def AssertTestResult(self, result): 173 """Asserts that testcase finished as expected. 174 175 Checks that device is in responsive state. If not, waits for boot 176 then reports test as failure. If it is, asserts that all test commands 177 returned exit code 0. 178 179 Args: 180 result: dict(str, str, int), stdout, stderr and return code 181 from test run. 182 """ 183 if self._dut.hasBooted(): 184 exit_code = result[const.EXIT_CODE] 185 asserts.skipIf(exit_code == config.ExitCode.POC_TEST_SKIP, 186 "Test case was skipped.") 187 asserts.assertFalse(exit_code == config.ExitCode.POC_TEST_FAIL, 188 "Test case failed.") 189 else: 190 self._dut.waitForBootCompletion() 191 self._dut.rootAdb() 192 self.PushFiles() 193 asserts.fail("Test case left the device in unresponsive state.") 194 195 def generateSecurityPoCTests(self): 196 """Runs security PoC tests.""" 197 self.PushFiles() 198 self.runGeneratedTests( 199 test_func=self.RunTestcase, 200 settings=self._testcases, 201 name_func=lambda x: x.replace('/','_')) 202 203if __name__ == "__main__": 204 test_runner.main() 205