1#!/usr/bin/env python3.5
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18import datetime
19from acts import utils
20from acts import asserts
21from acts import signals
22from acts.base_test import BaseTestClass
23from acts.test_decorators import test_tracker_info
24from acts_contrib.test_utils.gnss import gnss_test_utils as gutils
25from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
26from acts_contrib.test_utils.tel import tel_test_utils as tutils
27
28CONCURRENCY_TYPE = {
29    "gnss": "GNSS location received",
30    "gnss_meas": "GNSS measurement received",
31    "ap_location": "reportLocation"
32}
33
34
35class GnssConcurrencyTest(BaseTestClass):
36    """ GNSS Concurrency TTFF Tests. """
37
38    def setup_class(self):
39        super().setup_class()
40        self.ad = self.android_devices[0]
41        req_params = [
42            "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path",
43            "outlier_criteria", "max_outliers"
44        ]
45        self.unpack_userparams(req_param_names=req_params)
46        gutils._init_device(self.ad)
47
48    def setup_test(self):
49        gutils.start_pixel_logger(self.ad)
50        tutils.start_adb_tcpdump(self.ad)
51        # related properties
52        gutils.check_location_service(self.ad)
53        gutils.get_baseband_and_gms_version(self.ad)
54        self.load_chre_nanoapp()
55
56    def teardown_test(self):
57        gutils.stop_pixel_logger(self.ad)
58        tutils.stop_adb_tcpdump(self.ad)
59
60    def on_fail(self, test_name, begin_time):
61        self.ad.take_bug_report(test_name, begin_time)
62        gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path)
63        tutils.get_tcpdump_log(self.ad, test_name, begin_time)
64
65    def load_chre_nanoapp(self):
66        """ Load CHRE nanoapp to target Android Device. """
67        for _ in range(0, 3):
68            try:
69                self.ad.log.info("Start to load the nanoapp")
70                res = self.ad.adb.shell("chre_power_test_client load")
71                if "success: 1" in res:
72                    self.ad.log.info("Nano app loaded successfully")
73                    break
74            except Exception as e:
75                self.ad.log.warning("Nano app loaded fail: %s" % e)
76                gutils.reboot(self.ad)
77        else:
78            raise signals.TestError("Failed to load CHRE nanoapp")
79
80    def enable_gnss_concurrency(self, freq):
81        """ Enable or disable gnss concurrency via nanoapp.
82
83        Args:
84            freq: an int for frequency, set 0 as disable.
85        """
86        freq = freq * 1000
87        cmd = "chre_power_test_client"
88        option = "enable %d" % freq if freq != 0 else "disable"
89
90        for type in CONCURRENCY_TYPE.keys():
91            if "ap" not in type:
92                self.ad.adb.shell(" ".join([cmd, type, option]))
93
94    def run_concurrency_test(self, ap_freq, chre_freq, test_time):
95        """ Run the concurrency test with specific sequence.
96
97        Args:
98            ap_freq: int for AP side location request frequency.
99            chre_freq: int forCHRE side location request frequency.
100            test_time: int for test duration.
101        Return: test begin time.
102        """
103        gutils.process_gnss_by_gtw_gpstool(self.ad, self.standalone_cs_criteria)
104        begin_time = utils.get_current_epoch_time()
105        gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=ap_freq)
106        self.enable_gnss_concurrency(chre_freq)
107        time.sleep(test_time)
108        self.enable_gnss_concurrency(0)
109        gutils.start_gnss_by_gtw_gpstool(self.ad, False)
110        return begin_time
111
112    def parse_concurrency_result(self, begin_time, type, criteria):
113        """ Parse the test result with given time and criteria.
114
115        Args:
116            begin_time: test begin time.
117            type: str for location request type.
118            criteria: int for test criteria.
119        Return: List for the failure and outlier loops.
120        """
121        results = []
122        failures = []
123        outliers = []
124        search_results = self.ad.search_logcat(CONCURRENCY_TYPE[type],
125                                               begin_time)
126        start_time = utils.epoch_to_human_time(begin_time)
127        start_time = datetime.datetime.strptime(start_time,
128                                                "%m-%d-%Y %H:%M:%S ")
129        results.append(
130            (search_results[0]["datetime_obj"] - start_time).total_seconds())
131        samples = len(search_results) - 1
132        for i in range(samples):
133            target = search_results[i + 1]
134            timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"]
135            timedelt_sec = timedelt.total_seconds()
136            results.append(timedelt_sec)
137            if timedelt_sec > (criteria *
138                               self.chre_tolerate_rate) + self.outlier_criteria:
139                failures.append(target)
140                self.ad.log.error("[Failure][%s]:%.2f sec" %
141                                  (target["time_stamp"], timedelt_sec))
142            elif timedelt_sec > criteria * self.chre_tolerate_rate:
143                outliers.append(target)
144                self.ad.log.info("[Outlier][%s]:%.2f sec" %
145                                 (target["time_stamp"], timedelt_sec))
146
147        res_summary = " ".join([str(res) for res in results])
148        self.ad.log.info("[%s]Overall Result: %s" % (type, res_summary))
149        self.ad.log.info("TestResult %s_samples %d" % (type, samples))
150        self.ad.log.info("TestResult %s_outliers %d" % (type, len(outliers)))
151        self.ad.log.info("TestResult %s_failures %d" % (type, len(failures)))
152        self.ad.log.info("TestResult %s_max_time %.2f" %
153                         (type, max(results[1:])))
154
155        return outliers, failures
156
157    def execute_gnss_concurrency_test(self, criteria, test_duration):
158        """ Execute GNSS concurrency test steps.
159
160        Args:
161            criteria: int for test criteria.
162            test_duration: int for test duration.
163        """
164        failures = {}
165        outliers = {}
166        begin_time = self.run_concurrency_test(criteria["ap_location"],
167                                               criteria["gnss"], test_duration)
168        for type in CONCURRENCY_TYPE.keys():
169            self.ad.log.info("Starting process %s result" % type)
170            outliers[type], failures[type] = self.parse_concurrency_result(
171                begin_time, type, criteria[type])
172        for type in CONCURRENCY_TYPE.keys():
173            if len(failures[type]) > 0:
174                raise signals.TestFailure("Test exceeds criteria: %.2f" %
175                                          criteria[type])
176            elif len(outliers[type]) > self.max_outliers:
177                raise signals.TestFailure("Outliers excceds max amount: %d" %
178                                          len(outliers[type]))
179
180    # Test Cases
181    def test_gnss_concurrency_ct1(self):
182        test_duration = 15
183        criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1}
184        self.execute_gnss_concurrency_test(criteria, test_duration)
185
186    def test_gnss_concurrency_ct2(self):
187        test_duration = 30
188        criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8}
189        self.execute_gnss_concurrency_test(criteria, test_duration)
190
191    def test_gnss_concurrency_ct3(self):
192        test_duration = 60
193        criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8}
194        self.execute_gnss_concurrency_test(criteria, test_duration)
195
196    def test_gnss_concurrency_aoc1(self):
197        test_duration = 120
198        criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1}
199        self.execute_gnss_concurrency_test(criteria, test_duration)
200
201    def test_gnss_concurrency_aoc2(self):
202        test_duration = 120
203        criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10}
204        self.execute_gnss_concurrency_test(criteria, test_duration)
205