1# Lint as: python3
2"""Tests for blueberry.tests.bluetooth.bluetooth_throughput."""
3
4from __future__ import absolute_import
5from __future__ import division
6from __future__ import print_function
7
8import logging
9import math
10
11from mobly import asserts
12from mobly import test_runner
13from mobly.controllers.android_device_lib.jsonrpc_client_base import ApiError
14from mobly.signals import TestAbortClass
15# Internal import
16from blueberry.utils import blueberry_base_test
17from blueberry.utils import metrics_utils
18# Internal import
19
20
21class BluetoothThroughputTest(blueberry_base_test.BlueberryBaseTest):
22
23  @retry.logged_retry_on_exception(
24      retry_intervals=retry.FuzzedExponentialIntervals(
25          initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300))
26  def _measure_throughput(self, num_of_buffers, buffer_size):
27    """Measures the throughput of a data transfer.
28
29    Sends data from the client device that is read by the server device.
30    Calculates the throughput for the transfer.
31
32    Args:
33        num_of_buffers: An integer value designating the number of buffers
34                      to be sent.
35        buffer_size: An integer value designating the size of each buffer,
36                   in bytes.
37
38    Returns:
39        The throughput of the transfer in bytes per second.
40    """
41
42    # TODO(user): Need to fix throughput send/receive methods
43    (self.phone.sl4a
44     .bluetoothConnectionThroughputSend(num_of_buffers, buffer_size))
45
46    throughput = (self.derived_bt_device.sl4a
47                  .bluetoothConnectionThroughputRead(num_of_buffers,
48                                                     buffer_size))
49    return throughput
50
51  def _throughput_test(self, buffer_size, test_name):
52    logging.info('throughput test with buffer_size: %d and testname: %s',
53                 buffer_size, test_name)
54    metrics = {}
55    throughput_list = []
56    num_of_buffers = 1
57    for _ in range(self.iterations):
58      throughput = self._measure_throughput(num_of_buffers, buffer_size)
59      logging.info('Throughput: %d bytes-per-sec', throughput)
60      throughput_list.append(throughput)
61
62    metrics['data_transfer_protocol'] = self.data_transfer_type
63    metrics['data_packet_size'] = buffer_size
64    metrics['data_throughput_min_bytes_per_second'] = int(
65        min(throughput_list))
66    metrics['data_throughput_max_bytes_per_second'] = int(
67        max(throughput_list))
68    metrics['data_throughput_avg_bytes_per_second'] = int(
69        math.fsum(throughput_list) / float(len(throughput_list)))
70
71    logging.info('Throughput at large buffer: %s', metrics)
72
73    asserts.assert_true(metrics['data_throughput_min_bytes_per_second'] > 0,
74                        'Minimum throughput must be greater than 0!')
75
76    self.metrics.add_test_metrics(metrics)
77    for metric in metrics:
78      self.record_data({
79          'Test Name': test_name,
80          'sponge_properties': {
81              metric: metrics[metric],
82          }
83      })
84    self.record_data({
85        'Test Name': test_name,
86        'sponge_properties': {
87            'proto_ascii':
88                self.metrics.proto_message_to_ascii(),
89            'primary_device_build':
90                self.phone.get_device_info()['android_release_id']
91        }
92    })
93
94  def setup_class(self):
95    """Standard Mobly setup class."""
96    super(BluetoothThroughputTest, self).setup_class()
97    if len(self.android_devices) < 2:
98      raise TestAbortClass(
99          'Not enough android phones detected (need at least two)')
100    self.phone = self.android_devices[0]
101
102    # We treat the secondary phone as a derived_bt_device in order for the
103    # generic script to work with this android phone properly. Data will be sent
104    # from first phone to the second phone.
105    self.derived_bt_device = self.android_devices[1]
106    self.phone.init_setup()
107    self.derived_bt_device.init_setup()
108    self.phone.sl4a_setup()
109    self.derived_bt_device.sl4a_setup()
110    self.set_btsnooplogmode_full(self.phone)
111    self.set_btsnooplogmode_full(self.derived_bt_device)
112
113    self.metrics = (
114        metrics_utils.BluetoothMetricLogger(
115            metrics_pb2.BluetoothDataTestResult()))
116    self.metrics.add_primary_device_metrics(self.phone)
117    self.metrics.add_connected_device_metrics(self.derived_bt_device)
118
119    self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM
120    self.iterations = int(self.user_params.get('iterations', 300))
121    logging.info('Running Bluetooth throughput test %s times.', self.iterations)
122    logging.info('Successfully found required devices.')
123
124  def setup_test(self):
125    """Setup for bluetooth latency test."""
126    logging.info('Setup Test for test_bluetooth_throughput')
127    super(BluetoothThroughputTest, self).setup_test()
128    asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device),
129                        'Failed to establish RFCOMM connection')
130
131  def test_bluetooth_throughput_large_buffer(self):
132    """Tests the throughput with large buffer size.
133
134    Tests the throughput over a series of data transfers with large buffer size.
135    """
136    large_buffer_size = 300
137    test_name = 'test_bluetooth_throughput_large_buffer'
138    self._throughput_test(large_buffer_size, test_name)
139
140  def test_bluetooth_throughput_medium_buffer(self):
141    """Tests the throughput with medium buffer size.
142
143    Tests the throughput over a series of data transfers with medium buffer
144    size.
145    """
146    medium_buffer_size = 100
147    test_name = 'test_bluetooth_throughput_medium_buffer'
148    self._throughput_test(medium_buffer_size, test_name)
149
150  def test_bluetooth_throughput_small_buffer(self):
151    """Tests the throughput with small buffer size.
152
153    Tests the throughput over a series of data transfers with small buffer size.
154    """
155    small_buffer_size = 10
156    test_name = 'test_bluetooth_throughput_small_buffer'
157    self._throughput_test(small_buffer_size, test_name)
158
159  def test_maximum_buffer_size(self):
160    """Calculates the maximum allowed buffer size for one packet."""
161    current_buffer_size = 300
162    throughput = -1
163    num_of_buffers = 1
164    while True:
165      logging.info('Trying buffer size %d', current_buffer_size)
166      try:
167        throughput = self._measure_throughput(
168            num_of_buffers, current_buffer_size)
169        logging.info('The throughput is %d at buffer size of %d', throughput,
170                     current_buffer_size)
171      except ApiError:
172        maximum_buffer_size = current_buffer_size - 1
173        logging.info('Max buffer size: %d bytes', maximum_buffer_size)
174        logging.info('Max throughput: %d bytes-per-second', throughput)
175        self.record_data({
176            'Test Name': 'test_maximum_buffer_size',
177            'sponge_properties': {
178                'maximum_buffer_size': maximum_buffer_size
179            }
180        })
181        return True
182      current_buffer_size += 1
183
184  def teardown_test(self):
185    self.phone.sl4a.bluetoothSocketConnStop()
186    self.derived_bt_device.sl4a.bluetoothSocketConnStop()
187
188  def teardown_class(self):
189    self.phone.factory_reset_bluetooth()
190    self.derived_bt_device.factory_reset_bluetooth()
191    logging.info('Factory resetting Bluetooth on devices.')
192    super(BluetoothThroughputTest, self).teardown_class()
193
194
195if __name__ == '__main__':
196  test_runner.main()
197