1#!/usr/bin/env python3.5 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18import datetime 19from acts import utils 20from acts import asserts 21from acts import signals 22from acts.base_test import BaseTestClass 23from acts.test_decorators import test_tracker_info 24from acts_contrib.test_utils.gnss import gnss_test_utils as gutils 25from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 26from acts_contrib.test_utils.tel import tel_test_utils as tutils 27 28CONCURRENCY_TYPE = { 29 "gnss": "GNSS location received", 30 "gnss_meas": "GNSS measurement received", 31 "ap_location": "reportLocation" 32} 33 34 35class GnssConcurrencyTest(BaseTestClass): 36 """ GNSS Concurrency TTFF Tests. """ 37 38 def setup_class(self): 39 super().setup_class() 40 self.ad = self.android_devices[0] 41 req_params = [ 42 "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path", 43 "outlier_criteria", "max_outliers" 44 ] 45 self.unpack_userparams(req_param_names=req_params) 46 gutils._init_device(self.ad) 47 48 def setup_test(self): 49 gutils.start_pixel_logger(self.ad) 50 tutils.start_adb_tcpdump(self.ad) 51 # related properties 52 gutils.check_location_service(self.ad) 53 gutils.get_baseband_and_gms_version(self.ad) 54 self.load_chre_nanoapp() 55 56 def teardown_test(self): 57 gutils.stop_pixel_logger(self.ad) 58 tutils.stop_adb_tcpdump(self.ad) 59 60 def on_fail(self, test_name, begin_time): 61 self.ad.take_bug_report(test_name, begin_time) 62 gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path) 63 tutils.get_tcpdump_log(self.ad, test_name, begin_time) 64 65 def load_chre_nanoapp(self): 66 """ Load CHRE nanoapp to target Android Device. """ 67 for _ in range(0, 3): 68 try: 69 self.ad.log.info("Start to load the nanoapp") 70 res = self.ad.adb.shell("chre_power_test_client load") 71 if "success: 1" in res: 72 self.ad.log.info("Nano app loaded successfully") 73 break 74 except Exception as e: 75 self.ad.log.warning("Nano app loaded fail: %s" % e) 76 gutils.reboot(self.ad) 77 else: 78 raise signals.TestError("Failed to load CHRE nanoapp") 79 80 def enable_gnss_concurrency(self, freq): 81 """ Enable or disable gnss concurrency via nanoapp. 82 83 Args: 84 freq: an int for frequency, set 0 as disable. 85 """ 86 freq = freq * 1000 87 cmd = "chre_power_test_client" 88 option = "enable %d" % freq if freq != 0 else "disable" 89 90 for type in CONCURRENCY_TYPE.keys(): 91 if "ap" not in type: 92 self.ad.adb.shell(" ".join([cmd, type, option])) 93 94 def run_concurrency_test(self, ap_freq, chre_freq, test_time): 95 """ Run the concurrency test with specific sequence. 96 97 Args: 98 ap_freq: int for AP side location request frequency. 99 chre_freq: int forCHRE side location request frequency. 100 test_time: int for test duration. 101 Return: test begin time. 102 """ 103 gutils.process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria) 104 begin_time = utils.get_current_epoch_time() 105 gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=ap_freq) 106 self.enable_gnss_concurrency(chre_freq) 107 time.sleep(test_time) 108 self.enable_gnss_concurrency(0) 109 gutils.start_gnss_by_gtw_gpstool(self.ad, False) 110 return begin_time 111 112 def parse_concurrency_result(self, begin_time, type, criteria): 113 """ Parse the test result with given time and criteria. 114 115 Args: 116 begin_time: test begin time. 117 type: str for location request type. 118 criteria: int for test criteria. 119 Return: List for the failure and outlier loops. 120 """ 121 results = [] 122 failures = [] 123 outliers = [] 124 search_results = self.ad.search_logcat(CONCURRENCY_TYPE[type], 125 begin_time) 126 start_time = utils.epoch_to_human_time(begin_time) 127 start_time = datetime.datetime.strptime(start_time, 128 "%m-%d-%Y %H:%M:%S ") 129 results.append( 130 (search_results[0]["datetime_obj"] - start_time).total_seconds()) 131 samples = len(search_results) - 1 132 for i in range(samples): 133 target = search_results[i + 1] 134 timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"] 135 timedelt_sec = timedelt.total_seconds() 136 results.append(timedelt_sec) 137 if timedelt_sec > (criteria * 138 self.chre_tolerate_rate) + self.outlier_criteria: 139 failures.append(target) 140 self.ad.log.error("[Failure][%s]:%.2f sec" % 141 (target["time_stamp"], timedelt_sec)) 142 elif timedelt_sec > criteria * self.chre_tolerate_rate: 143 outliers.append(target) 144 self.ad.log.info("[Outlier][%s]:%.2f sec" % 145 (target["time_stamp"], timedelt_sec)) 146 147 res_summary = " ".join([str(res) for res in results]) 148 self.ad.log.info("[%s]Overall Result: %s" % (type, res_summary)) 149 self.ad.log.info("TestResult %s_samples %d" % (type, samples)) 150 self.ad.log.info("TestResult %s_outliers %d" % (type, len(outliers))) 151 self.ad.log.info("TestResult %s_failures %d" % (type, len(failures))) 152 self.ad.log.info("TestResult %s_max_time %.2f" % 153 (type, max(results[1:]))) 154 155 return outliers, failures 156 157 def execute_gnss_concurrency_test(self, criteria, test_duration): 158 """ Execute GNSS concurrency test steps. 159 160 Args: 161 criteria: int for test criteria. 162 test_duration: int for test duration. 163 """ 164 failures = {} 165 outliers = {} 166 begin_time = self.run_concurrency_test(criteria["ap_location"], 167 criteria["gnss"], test_duration) 168 for type in CONCURRENCY_TYPE.keys(): 169 self.ad.log.info("Starting process %s result" % type) 170 outliers[type], failures[type] = self.parse_concurrency_result( 171 begin_time, type, criteria[type]) 172 for type in CONCURRENCY_TYPE.keys(): 173 if len(failures[type]) > 0: 174 raise signals.TestFailure("Test exceeds criteria: %.2f" % 175 criteria[type]) 176 elif len(outliers[type]) > self.max_outliers: 177 raise signals.TestFailure("Outliers excceds max amount: %d" % 178 len(outliers[type])) 179 180 # Test Cases 181 def test_gnss_concurrency_ct1(self): 182 test_duration = 15 183 criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1} 184 self.execute_gnss_concurrency_test(criteria, test_duration) 185 186 def test_gnss_concurrency_ct2(self): 187 test_duration = 30 188 criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8} 189 self.execute_gnss_concurrency_test(criteria, test_duration) 190 191 def test_gnss_concurrency_ct3(self): 192 test_duration = 60 193 criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8} 194 self.execute_gnss_concurrency_test(criteria, test_duration) 195 196 def test_gnss_concurrency_aoc1(self): 197 test_duration = 120 198 criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1} 199 self.execute_gnss_concurrency_test(criteria, test_duration) 200 201 def test_gnss_concurrency_aoc2(self): 202 test_duration = 120 203 criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10} 204 self.execute_gnss_concurrency_test(criteria, test_duration) 205