1#/usr/bin/env python3.4
2#
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises set PHY and read PHY procedures.
18"""
19
20from queue import Empty
21
22from acts.test_decorators import test_tracker_info
23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
24from acts.test_utils.bt.GattConnectedBaseTest import GattConnectedBaseTest
25from acts.test_utils.bt.bt_constants import gatt_connection_priority
26from acts.test_utils.bt.bt_constants import gatt_event
27from acts.test_utils.bt.bt_constants import gatt_phy
28from acts import signals
29
30CONNECTION_PRIORITY_HIGH = gatt_connection_priority['high']
31PHY_LE_1M = gatt_phy['1m']
32PHY_LE_2M = gatt_phy['2m']
33
34
35def lfmt(txPhy, rxPhy):
36    return '(' + list(gatt_phy.keys())[list(gatt_phy.values()).index(
37        txPhy)] + ', ' + list(gatt_phy.keys())[list(gatt_phy.values()).index(
38            rxPhy)] + ')'
39
40
41class PhyTest(GattConnectedBaseTest):
42    def setup_class(self):
43        if not self.cen_ad.droid.bluetoothIsLe2MPhySupported():
44            raise signals.TestSkipClass(
45                "Central device does not support LE 2M PHY")
46
47        if not self.per_ad.droid.bluetoothIsLe2MPhySupported():
48            raise signals.TestSkipClass(
49                "Peripheral device does not support LE 2M PHY")
50
51    # Some controllers auto-update PHY to 2M, and both client and server
52    # might receive PHY Update event right after connection, if the
53    # connection was established over 1M PHY. We will ignore this event, but
54    # must pop it from queue.
55    def pop_initial_phy_update(self):
56        try:
57            maybe_event = gatt_event['phy_update']['evt'].format(
58                self.gatt_callback)
59            self.cen_ad.ed.pop_event(maybe_event, 0)
60        except Empty:
61            pass
62
63        try:
64            maybe_event = gatt_event['serv_phy_update']['evt'].format(
65                self.gatt_server_callback)
66            self.per_ad.ed.pop_event(maybe_event, 0)
67        except Empty:
68            pass
69
70    # this helper method checks wether both client and server received PHY
71    # update event with proper txPhy and rxPhy
72    def ensure_both_updated_phy(self, clientTxPhy, clientRxPhy):
73        event = self._client_wait(gatt_event['phy_update'])
74        txPhy = event['data']['TxPhy']
75        rxPhy = event['data']['RxPhy']
76        self.log.info("\tClient PHY updated: " + lfmt(txPhy, rxPhy))
77        self.assertEqual(0, event['data']['Status'], "Status should be 0")
78        self.assertEqual(clientTxPhy, event['data']['TxPhy'])
79        self.assertEqual(clientRxPhy, event['data']['RxPhy'])
80
81        bt_device_id = 0
82        event = self._server_wait(gatt_event['serv_phy_update'])
83        txPhy = event['data']['TxPhy']
84        rxPhy = event['data']['RxPhy']
85        self.log.info("\tServer PHY updated: " + lfmt(txPhy, rxPhy))
86        self.assertEqual(0, event['data']['Status'], "Status should be 0")
87        self.assertEqual(clientRxPhy, event['data']['TxPhy'])
88        self.assertEqual(clientTxPhy, event['data']['RxPhy'])
89
90    # read the client phy, return (txPhy, rxPhy)
91    def read_client_phy(self):
92        self.cen_ad.droid.gattClientReadPhy(self.bluetooth_gatt)
93        event = self._client_wait(gatt_event['phy_read'])
94        self.assertEqual(0, event['data']['Status'], "Status should be 0")
95        return (event['data']['TxPhy'], event['data']['RxPhy'])
96
97    # read the server phy, return (txPhy, rxPhy)
98    def read_server_phy(self):
99        bt_device_id = 0
100        self.per_ad.droid.gattServerReadPhy(self.gatt_server, bt_device_id)
101        event = self._server_wait(gatt_event['serv_phy_read'])
102        self.assertEqual(0, event['data']['Status'], "Status should be 0")
103        return (event['data']['TxPhy'], event['data']['RxPhy'])
104
105    @BluetoothBaseTest.bt_test_wrap
106    @test_tracker_info(uuid='edb95ae1-97e5-4337-9a60-1e113aa43a4d')
107    def test_phy_read(self):
108        """Test LE read PHY.
109
110        Test LE read PHY.
111
112        Steps:
113        1. Central, Peripheral : read PHY, make sure values are same.
114        2. Central: update PHY.
115        3. Ensure both Central and Peripheral received PHY update event.
116        4. Central, Peripheral: read PHY, make sure values are same.
117
118        Expected Result:
119        Verify that read PHY works properly.
120
121        Returns:
122          Pass if True
123          Fail if False
124
125        TAGS: LE, PHY
126        Priority: 0
127        """
128        self.cen_ad.droid.gattClientRequestConnectionPriority(
129            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
130        self.pop_initial_phy_update()
131
132        # read phy from client and server, make sure they're same
133        cTxPhy, cRxPhy = self.read_client_phy()
134        sTxPhy, sRxPhy = self.read_server_phy()
135        self.assertEqual(cTxPhy, sTxPhy)
136        self.assertEqual(cRxPhy, sRxPhy)
137
138        self.log.info("Initial connection PHY was: " + lfmt(cTxPhy, cRxPhy))
139
140        nextTxPhy = (cTxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
141        nextRxPhy = (cRxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
142
143        # try to update PHY from Client
144        self.log.info("Will try to set PHY to: " + lfmt(nextTxPhy, nextRxPhy))
145        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
146                                                    nextTxPhy, nextRxPhy, 0)
147        self.ensure_both_updated_phy(nextTxPhy, nextRxPhy)
148
149        # read phy on client and server, make sure values are same and equal
150        # the newly set value
151        cTxPhy, cRxPhy = self.read_client_phy()
152        sTxPhy, sRxPhy = self.read_server_phy()
153        self.assertEqual(cTxPhy, sTxPhy)
154        self.assertEqual(cRxPhy, sRxPhy)
155
156        self.assertEqual(nextTxPhy, cTxPhy)
157        self.assertEqual(nextRxPhy, cRxPhy)
158        return True
159
160    @BluetoothBaseTest.bt_test_wrap
161    @test_tracker_info(uuid='6b66af0a-35eb-42af-acd5-9634684f275d')
162    def test_phy_change_20_times(self):
163        """Test PHY update.
164
165        Test LE PHY update.
166
167        Steps:
168        1. Central: read PHY.
169        2. Central: update PHY to 1M, 2M, 1M... 20 times, each time ensuring
170                    both client and server received PHY update event.
171
172        Expected Result:
173        Verify that read update PHY worked properly each time.
174
175        Returns:
176          Pass if True
177          Fail if False
178
179        TAGS: LE, PHY
180        Priority: 0
181        """
182        self.cen_ad.droid.gattClientRequestConnectionPriority(
183            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
184        self.pop_initial_phy_update()
185
186        txPhyB, rxPhyB = self.read_client_phy()
187        txPhyA = (txPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
188        rxPhyA = (rxPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M
189
190        self.log.info("Initial connection PHY was: " + lfmt(txPhyB, rxPhyB))
191
192        for i in range(20):
193            #swap values between iterations
194            txPhy = (i & 1) and txPhyB or txPhyA
195            rxPhy = (i & 1) and rxPhyB or rxPhyA
196
197            self.log.info("Will try to set PHY to: " + lfmt(txPhy, rxPhy))
198            self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
199                                                        txPhy, rxPhy, 0)
200            self.ensure_both_updated_phy(txPhy, rxPhy)
201        return True
202
203    @BluetoothBaseTest.bt_test_wrap
204    @test_tracker_info(uuid='13f28de4-07f4-458c-a3e5-3ba95318616f')
205    def test_phy_change_asym(self):
206        """Test PHY update with asymetric rx and tx PHY.
207
208        Test PHY update with asymetric rx and tx PHY.
209
210        Steps:
211        1. Central: read PHY.
212        2. Central: update PHY to tx: 1M, rx: 2M, ensure both devices received
213                    the asymetric update.
214        3. Central: update PHY to tx: 2M, rx: 1M, ensure both devices received
215                    the asymetric update.
216
217        Expected Result:
218        Verify that read update PHY worked properly each time.
219
220        Returns:
221          Pass if True
222          Fail if False
223
224        TAGS: LE, PHY
225        Priority: 0
226        """
227        self.cen_ad.droid.gattClientRequestConnectionPriority(
228            self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH)
229        self.pop_initial_phy_update()
230
231        txPhy, rxPhy = self.read_client_phy()
232
233        self.log.info("Initial connection PHY was: " + lfmt(txPhy, rxPhy))
234        self.log.info("will try to set PHY to: PHY_LE_1M, PHY_LE_2M")
235
236        #try to update PHY to tx 1M, rx 2M from Client
237        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
238                                                    PHY_LE_1M, PHY_LE_2M, 0)
239        self.ensure_both_updated_phy(PHY_LE_1M, PHY_LE_2M)
240
241        #try to update PHY to TX 2M, RX 1M from Client
242        self.log.info("will try to set PHY to: PHY_LE_2M, PHY_LE_1M")
243        self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt,
244                                                    PHY_LE_2M, PHY_LE_1M, 0)
245        self.ensure_both_updated_phy(PHY_LE_2M, PHY_LE_1M)
246
247        return True
248