1#  Copyright (C) 2023 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15"""Mobly base test class for Neaby Connections."""
16
17import dataclasses
18import datetime
19import logging
20import time
21
22from mobly import asserts
23from mobly import base_test
24from mobly import records
25from mobly import utils
26from mobly.controllers import android_device
27from mobly.controllers.android_device_lib import errors
28
29from performance_test import nc_constants
30from performance_test import setup_utils
31
32NEARBY_SNIPPET_PACKAGE_NAME = 'com.google.android.nearby.mobly.snippet'
33
34
35class NCBaseTestClass(base_test.BaseTestClass):
36  """Nearby Connection E2E tests."""
37
38  def __init__(self, configs):
39    super().__init__(configs)
40    self.ads: list[android_device.AndroidDevice] = []
41    self.advertiser: android_device.AndroidDevice = None
42    self.discoverer: android_device.AndroidDevice = None
43    self.test_parameters: nc_constants.TestParameters = None
44    self._test_script_version = None
45    self._nearby_snippet_apk_path: str = None
46    self.performance_test_iterations: int = 1
47    self.num_bug_reports: int = 0
48
49  def setup_class(self) -> None:
50    self.ads = self.register_controller(android_device, min_number=2)
51    self.test_parameters = self._get_test_parameter()
52    self._nearby_snippet_apk_path = self.user_params.get('files', {}).get(
53        'nearby_snippet', [''])[0]
54
55    # set run identifier property
56    self._set_run_identifier()
57
58    utils.concurrent_exec(
59        self._setup_android_device,
60        param_list=[[ad] for ad in self.ads],
61        raise_on_exception=True,
62    )
63
64    try:
65      self.discoverer = android_device.get_device(
66          self.ads, role='source_device'
67      )
68      self.advertiser = android_device.get_device(
69          self.ads, role='target_device'
70      )
71    except errors.Error:
72      logging.warning(
73          'The source,target devices are not specified in testbed;'
74          'The result may not be expected.'
75      )
76      self.advertiser, self.discoverer = self.ads
77
78  def _set_run_identifier(self) -> None:
79    """Set a run_identifier property describing the test run context."""
80    run_identifier = {}
81    run_identifier['test_version'] = self._test_script_version
82    run_identifier['alias'] = self.test_parameters.test_report_alias_name
83    run_identifier['devices'] = [
84        f'{ad.model}({ad.build_info["build_id"]})' for ad in self.ads
85    ]
86    run_identifier_str = ', '.join(
87        [f'{key}:{value}' for key, value in run_identifier.items()]
88    )
89    run_identifier_str = f'{{{run_identifier_str}}}'
90    self.record_data(
91        {'properties': {'run_identifier': run_identifier_str}}
92    )
93
94  def _disconnect_from_wifi(self, ad: android_device.AndroidDevice) -> None:
95    if not ad.is_adb_root:
96      ad.log.info("Can't clear wifi network in non-rooted device")
97      return
98    ad.nearby.wifiClearConfiguredNetworks()
99    time.sleep(nc_constants.WIFI_DISCONNECTION_DELAY.total_seconds())
100
101  def _setup_android_device(self, ad: android_device.AndroidDevice) -> None:
102    if not ad.is_adb_root:
103      if self.test_parameters.allow_unrooted_device:
104        ad.log.info('Unrooted device is detected. Test coverage is limited')
105      else:
106        asserts.skip('The test only can run on rooted device.')
107
108    setup_utils.disable_gms_auto_updates(ad)
109
110    ad.debug_tag = ad.serial + '(' + ad.adb.getprop('ro.product.model') + ')'
111    ad.log.info('try to install nearby_snippet_apk')
112    if self._nearby_snippet_apk_path:
113      setup_utils.install_apk(ad, self._nearby_snippet_apk_path)
114    else:
115      ad.log.warning(
116          'nearby_snippet apk is not specified, '
117          'make sure it is installed in the device'
118      )
119    ad.load_snippet('nearby', NEARBY_SNIPPET_PACKAGE_NAME)
120
121    ad.log.info('grant manage external storage permission')
122    setup_utils.grant_manage_external_storage_permission(
123        ad, NEARBY_SNIPPET_PACKAGE_NAME
124    )
125
126    if not ad.nearby.wifiIsEnabled():
127      ad.nearby.wifiEnable()
128    self._disconnect_from_wifi(ad)
129    setup_utils.enable_logs(ad)
130
131    setup_utils.disable_redaction(ad)
132
133    if (
134        self.test_parameters.upgrade_medium
135        == nc_constants.NearbyMedium.WIFIAWARE_ONLY.value
136    ):
137      setup_utils.enable_wifi_aware(ad)
138
139    if self.test_parameters.wifi_country_code:
140      setup_utils.set_country_code(
141          ad, self.test_parameters.wifi_country_code
142      )
143
144  def setup_test(self):
145    self._reset_nearby_connection()
146
147  def _reset_wifi_connection(self) -> None:
148    """Resets wifi connections on both devices."""
149    self.discoverer.nearby.wifiClearConfiguredNetworks()
150    self.advertiser.nearby.wifiClearConfiguredNetworks()
151    time.sleep(nc_constants.WIFI_DISCONNECTION_DELAY.total_seconds())
152
153  def _reset_nearby_connection(self) -> None:
154    """Resets nearby connection."""
155    self.discoverer.nearby.stopDiscovery()
156    self.discoverer.nearby.stopAllEndpoints()
157    self.advertiser.nearby.stopAdvertising()
158    self.advertiser.nearby.stopAllEndpoints()
159    time.sleep(nc_constants.NEARBY_RESET_WAIT_TIME.total_seconds())
160
161  def _teardown_device(self, ad: android_device.AndroidDevice) -> None:
162    ad.nearby.transferFilesCleanup()
163    setup_utils.enable_gms_auto_updates(ad)
164    if self.test_parameters.disconnect_wifi_after_test:
165      self._disconnect_from_wifi(ad)
166    ad.unload_snippet('nearby')
167
168  def teardown_test(self) -> None:
169    utils.concurrent_exec(
170        lambda d: d.services.create_output_excerpts_all(self.current_test_info),
171        param_list=[[ad] for ad in self.ads],
172        raise_on_exception=True,
173    )
174
175  def teardown_class(self) -> None:
176    utils.concurrent_exec(
177        self._teardown_device,
178        param_list=[[ad] for ad in self.ads],
179        raise_on_exception=True,
180    )
181    # handle summary results
182    self._summary_test_results()
183
184  def _summary_test_results(self) -> None:
185    pass
186
187  def _get_test_parameter(self) -> nc_constants.TestParameters:
188    test_parameters_names = {
189        field.name for field in dataclasses.fields(nc_constants.TestParameters)
190    }
191    test_parameters = nc_constants.TestParameters(
192        **{
193            key: val
194            for key, val in self.user_params.items()
195            if key in test_parameters_names
196        }
197    )
198
199    return test_parameters
200
201  def on_fail(self, record: records.TestResultRecord) -> None:
202    self.num_bug_reports = self.num_bug_reports + 1
203    if (self.num_bug_reports <= nc_constants.MAX_NUM_BUG_REPORT):
204      logging.info('take bug report for failure')
205      android_device.take_bug_reports(
206          self.ads,
207          destination=self.current_test_info.output_path,
208    )
209
210  def _stats_throughput_result(
211      self,
212      medium_name: str,
213      throughput_indicators: list[float],
214      success_rate_target: float,
215      median_benchmark_kbps: float,
216  ) -> nc_constants.ThroughputResultStats:
217    """Statistics the throughput test result of all iterations."""
218    n = self.performance_test_iterations
219    filtered = [
220        x
221        for x in throughput_indicators
222        if x != nc_constants.UNSET_THROUGHPUT_KBPS
223    ]
224    if not filtered:
225      # all test cases are failed
226      return nc_constants.ThroughputResultStats(
227          success_rate=0.0,
228          average_kbps=0.0,
229          percentile_50_kbps=0.0,
230          percentile_95_kbps=0.0,
231          success_count=0,
232          fail_targets=[
233              nc_constants.FailTargetSummary(
234                  f'{medium_name} transfer success rate',
235                  0.0,
236                  success_rate_target,
237                  '%',
238              )
239          ],
240      )
241    # use the descenting order of the throughput
242    filtered.sort(reverse=True)
243    success_count = len(filtered)
244    success_rate = round(
245        success_count * 100.0 / n, nc_constants.SUCCESS_RATE_PRECISION_DIGITS
246    )
247    average_kbps = round(sum(filtered) / len(filtered))
248    percentile_50_kbps = filtered[
249        int(len(filtered) * nc_constants.PERCENTILE_50_FACTOR)
250    ]
251    percentile_95_kbps = filtered[
252        int(len(filtered) * nc_constants.PERCENTILE_95_FACTOR)
253    ]
254    fail_targets: list[nc_constants.FailTargetSummary] = []
255    if success_rate < success_rate_target:
256      fail_targets.append(
257          nc_constants.FailTargetSummary(
258              f'{medium_name} transfer success rate',
259              success_rate,
260              success_rate_target,
261              '%',
262          )
263      )
264    if percentile_50_kbps < median_benchmark_kbps:
265      fail_targets.append(
266          nc_constants.FailTargetSummary(
267              f'{medium_name} median transfer speed (KBps)',
268              percentile_50_kbps,
269              median_benchmark_kbps,
270          )
271      )
272    return nc_constants.ThroughputResultStats(
273        success_rate,
274        average_kbps,
275        percentile_50_kbps,
276        percentile_95_kbps,
277        success_count,
278        fail_targets,
279    )
280
281  def _stats_latency_result(
282      self, latency_indicators: list[datetime.timedelta]
283  ) -> nc_constants.LatencyResultStats:
284    n = self.performance_test_iterations
285    filtered = [
286        latency.total_seconds()
287        for latency in latency_indicators
288        if latency != nc_constants.UNSET_LATENCY
289    ]
290    if not filtered:
291      # All test cases are failed.
292      return nc_constants.LatencyResultStats(
293          average_latency=0.0,
294          percentile_50=0.0,
295          percentile_95=0.0,
296          failure_count=n,
297      )
298
299    filtered.sort()
300    average = (
301        round(
302            sum(filtered) / len(filtered), nc_constants.LATENCY_PRECISION_DIGITS
303        )
304        / n
305    )
306    percentile_50 = round(
307        filtered[int(len(filtered) * nc_constants.PERCENTILE_50_FACTOR)],
308        nc_constants.LATENCY_PRECISION_DIGITS,
309    )
310    percentile_95 = round(
311        filtered[int(len(filtered) * nc_constants.PERCENTILE_95_FACTOR)],
312        nc_constants.LATENCY_PRECISION_DIGITS,
313    )
314
315    return nc_constants.LatencyResultStats(
316        average, percentile_50, percentile_95, n - len(filtered)
317    )
318
319  def _generate_target_fail_message(
320      self, fail_targets: list[nc_constants.FailTargetSummary]
321  ) -> str:
322    return ''.join(
323        f'{fail_target.title}: {fail_target.actual}{fail_target.unit}'
324        f' < {fail_target.goal}{fail_target.unit}\n'
325        for fail_target in fail_targets
326    )
327