1#/usr/bin/env python3.4 2# 3# Copyright (C) 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This test script exercises set PHY and read PHY procedures. 18""" 19 20from queue import Empty 21 22from acts.test_decorators import test_tracker_info 23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 24from acts.test_utils.bt.GattConnectedBaseTest import GattConnectedBaseTest 25from acts.test_utils.bt.bt_constants import gatt_connection_priority 26from acts.test_utils.bt.bt_constants import gatt_event 27from acts.test_utils.bt.bt_constants import gatt_phy 28from acts import signals 29 30CONNECTION_PRIORITY_HIGH = gatt_connection_priority['high'] 31PHY_LE_1M = gatt_phy['1m'] 32PHY_LE_2M = gatt_phy['2m'] 33 34 35def lfmt(txPhy, rxPhy): 36 return '(' + list(gatt_phy.keys())[list(gatt_phy.values()).index( 37 txPhy)] + ', ' + list(gatt_phy.keys())[list(gatt_phy.values()).index( 38 rxPhy)] + ')' 39 40 41class PhyTest(GattConnectedBaseTest): 42 def setup_class(self): 43 if not self.cen_ad.droid.bluetoothIsLe2MPhySupported(): 44 raise signals.TestSkipClass( 45 "Central device does not support LE 2M PHY") 46 47 if not self.per_ad.droid.bluetoothIsLe2MPhySupported(): 48 raise signals.TestSkipClass( 49 "Peripheral device does not support LE 2M PHY") 50 51 # Some controllers auto-update PHY to 2M, and both client and server 52 # might receive PHY Update event right after connection, if the 53 # connection was established over 1M PHY. We will ignore this event, but 54 # must pop it from queue. 55 def pop_initial_phy_update(self): 56 try: 57 maybe_event = gatt_event['phy_update']['evt'].format( 58 self.gatt_callback) 59 self.cen_ad.ed.pop_event(maybe_event, 0) 60 except Empty: 61 pass 62 63 try: 64 maybe_event = gatt_event['serv_phy_update']['evt'].format( 65 self.gatt_server_callback) 66 self.per_ad.ed.pop_event(maybe_event, 0) 67 except Empty: 68 pass 69 70 # this helper method checks wether both client and server received PHY 71 # update event with proper txPhy and rxPhy 72 def ensure_both_updated_phy(self, clientTxPhy, clientRxPhy): 73 event = self._client_wait(gatt_event['phy_update']) 74 txPhy = event['data']['TxPhy'] 75 rxPhy = event['data']['RxPhy'] 76 self.log.info("\tClient PHY updated: " + lfmt(txPhy, rxPhy)) 77 self.assertEqual(0, event['data']['Status'], "Status should be 0") 78 self.assertEqual(clientTxPhy, event['data']['TxPhy']) 79 self.assertEqual(clientRxPhy, event['data']['RxPhy']) 80 81 bt_device_id = 0 82 event = self._server_wait(gatt_event['serv_phy_update']) 83 txPhy = event['data']['TxPhy'] 84 rxPhy = event['data']['RxPhy'] 85 self.log.info("\tServer PHY updated: " + lfmt(txPhy, rxPhy)) 86 self.assertEqual(0, event['data']['Status'], "Status should be 0") 87 self.assertEqual(clientRxPhy, event['data']['TxPhy']) 88 self.assertEqual(clientTxPhy, event['data']['RxPhy']) 89 90 # read the client phy, return (txPhy, rxPhy) 91 def read_client_phy(self): 92 self.cen_ad.droid.gattClientReadPhy(self.bluetooth_gatt) 93 event = self._client_wait(gatt_event['phy_read']) 94 self.assertEqual(0, event['data']['Status'], "Status should be 0") 95 return (event['data']['TxPhy'], event['data']['RxPhy']) 96 97 # read the server phy, return (txPhy, rxPhy) 98 def read_server_phy(self): 99 bt_device_id = 0 100 self.per_ad.droid.gattServerReadPhy(self.gatt_server, bt_device_id) 101 event = self._server_wait(gatt_event['serv_phy_read']) 102 self.assertEqual(0, event['data']['Status'], "Status should be 0") 103 return (event['data']['TxPhy'], event['data']['RxPhy']) 104 105 @BluetoothBaseTest.bt_test_wrap 106 @test_tracker_info(uuid='edb95ae1-97e5-4337-9a60-1e113aa43a4d') 107 def test_phy_read(self): 108 """Test LE read PHY. 109 110 Test LE read PHY. 111 112 Steps: 113 1. Central, Peripheral : read PHY, make sure values are same. 114 2. Central: update PHY. 115 3. Ensure both Central and Peripheral received PHY update event. 116 4. Central, Peripheral: read PHY, make sure values are same. 117 118 Expected Result: 119 Verify that read PHY works properly. 120 121 Returns: 122 Pass if True 123 Fail if False 124 125 TAGS: LE, PHY 126 Priority: 0 127 """ 128 self.cen_ad.droid.gattClientRequestConnectionPriority( 129 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 130 self.pop_initial_phy_update() 131 132 # read phy from client and server, make sure they're same 133 cTxPhy, cRxPhy = self.read_client_phy() 134 sTxPhy, sRxPhy = self.read_server_phy() 135 self.assertEqual(cTxPhy, sTxPhy) 136 self.assertEqual(cRxPhy, sRxPhy) 137 138 self.log.info("Initial connection PHY was: " + lfmt(cTxPhy, cRxPhy)) 139 140 nextTxPhy = (cTxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 141 nextRxPhy = (cRxPhy == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 142 143 # try to update PHY from Client 144 self.log.info("Will try to set PHY to: " + lfmt(nextTxPhy, nextRxPhy)) 145 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 146 nextTxPhy, nextRxPhy, 0) 147 self.ensure_both_updated_phy(nextTxPhy, nextRxPhy) 148 149 # read phy on client and server, make sure values are same and equal 150 # the newly set value 151 cTxPhy, cRxPhy = self.read_client_phy() 152 sTxPhy, sRxPhy = self.read_server_phy() 153 self.assertEqual(cTxPhy, sTxPhy) 154 self.assertEqual(cRxPhy, sRxPhy) 155 156 self.assertEqual(nextTxPhy, cTxPhy) 157 self.assertEqual(nextRxPhy, cRxPhy) 158 return True 159 160 @BluetoothBaseTest.bt_test_wrap 161 @test_tracker_info(uuid='6b66af0a-35eb-42af-acd5-9634684f275d') 162 def test_phy_change_20_times(self): 163 """Test PHY update. 164 165 Test LE PHY update. 166 167 Steps: 168 1. Central: read PHY. 169 2. Central: update PHY to 1M, 2M, 1M... 20 times, each time ensuring 170 both client and server received PHY update event. 171 172 Expected Result: 173 Verify that read update PHY worked properly each time. 174 175 Returns: 176 Pass if True 177 Fail if False 178 179 TAGS: LE, PHY 180 Priority: 0 181 """ 182 self.cen_ad.droid.gattClientRequestConnectionPriority( 183 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 184 self.pop_initial_phy_update() 185 186 txPhyB, rxPhyB = self.read_client_phy() 187 txPhyA = (txPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 188 rxPhyA = (rxPhyB == PHY_LE_1M) and PHY_LE_2M or PHY_LE_1M 189 190 self.log.info("Initial connection PHY was: " + lfmt(txPhyB, rxPhyB)) 191 192 for i in range(20): 193 #swap values between iterations 194 txPhy = (i & 1) and txPhyB or txPhyA 195 rxPhy = (i & 1) and rxPhyB or rxPhyA 196 197 self.log.info("Will try to set PHY to: " + lfmt(txPhy, rxPhy)) 198 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 199 txPhy, rxPhy, 0) 200 self.ensure_both_updated_phy(txPhy, rxPhy) 201 return True 202 203 @BluetoothBaseTest.bt_test_wrap 204 @test_tracker_info(uuid='13f28de4-07f4-458c-a3e5-3ba95318616f') 205 def test_phy_change_asym(self): 206 """Test PHY update with asymetric rx and tx PHY. 207 208 Test PHY update with asymetric rx and tx PHY. 209 210 Steps: 211 1. Central: read PHY. 212 2. Central: update PHY to tx: 1M, rx: 2M, ensure both devices received 213 the asymetric update. 214 3. Central: update PHY to tx: 2M, rx: 1M, ensure both devices received 215 the asymetric update. 216 217 Expected Result: 218 Verify that read update PHY worked properly each time. 219 220 Returns: 221 Pass if True 222 Fail if False 223 224 TAGS: LE, PHY 225 Priority: 0 226 """ 227 self.cen_ad.droid.gattClientRequestConnectionPriority( 228 self.bluetooth_gatt, CONNECTION_PRIORITY_HIGH) 229 self.pop_initial_phy_update() 230 231 txPhy, rxPhy = self.read_client_phy() 232 233 self.log.info("Initial connection PHY was: " + lfmt(txPhy, rxPhy)) 234 self.log.info("will try to set PHY to: PHY_LE_1M, PHY_LE_2M") 235 236 #try to update PHY to tx 1M, rx 2M from Client 237 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 238 PHY_LE_1M, PHY_LE_2M, 0) 239 self.ensure_both_updated_phy(PHY_LE_1M, PHY_LE_2M) 240 241 #try to update PHY to TX 2M, RX 1M from Client 242 self.log.info("will try to set PHY to: PHY_LE_2M, PHY_LE_1M") 243 self.cen_ad.droid.gattClientSetPreferredPhy(self.bluetooth_gatt, 244 PHY_LE_2M, PHY_LE_1M, 0) 245 self.ensure_both_updated_phy(PHY_LE_2M, PHY_LE_1M) 246 247 return True 248