1# 2# Copyright 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16"""Script for sending testing parameters and commands to a Bluetooth device. 17 18This script provides a simple shell interface for sending data at run-time to a 19Bluetooth device. It is intended to be used in tandem with the test vendor 20library project. 21 22Usage: 23 Option A: Script 24 1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the 25 port to use for the test channel. 26 Option B: Manual 27 1. Choose a port to use for the test channel. Use 'adb forward tcp:<port> 28 tcp:<port>' to forward the port to the device. 29 2. In a separate shell, build and push the test vendor library to the device 30 using the script mentioned in option A (i.e. without the --test-channel flag 31 set). 32 3. Once logcat has started, turn Bluetooth on from the device. 33 4. Run this program, in the shell from step 1, the port, also from step 1, 34 as arguments. 35""" 36 37#!/usr/bin/env python3 38 39import cmd 40import random 41import socket 42import string 43import struct 44import sys 45import time 46 47DEVICE_NAME_LENGTH = 6 48DEVICE_ADDRESS_LENGTH = 6 49 50 51# Used to generate fake device names and addresses during discovery. 52def generate_random_name(): 53 return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ 54 string.digits) for _ in range(DEVICE_NAME_LENGTH)) 55 56 57def generate_random_address(): 58 return ''.join(random.SystemRandom().choice(string.digits) for _ in \ 59 range(DEVICE_ADDRESS_LENGTH)) 60 61 62class Connection(object): 63 """Simple wrapper class for a socket object. 64 65 Attributes: 66 socket: The underlying socket created for the specified address and port. 67 """ 68 69 def __init__(self, port): 70 self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 71 self._socket.connect(('localhost', port)) 72 73 def close(self): 74 self._socket.close() 75 76 def send(self, data): 77 self._socket.sendall(data.encode()) 78 79 def receive(self, size): 80 return self._socket.recv(size) 81 82 83class TestChannel(object): 84 """Checks outgoing commands and sends them once verified. 85 86 Attributes: 87 connection: The connection to the test vendor library that commands are sent 88 on. 89 """ 90 91 def __init__(self, port): 92 self._connection = Connection(port) 93 self._closed = False 94 95 def close(self): 96 self._connection.close() 97 self._closed = True 98 99 def send_command(self, name, args): 100 name_size = len(name) 101 args_size = len(args) 102 self.lint_command(name, args, name_size, args_size) 103 encoded_name = chr(name_size) + name 104 encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args) 105 command = encoded_name + encoded_args 106 if self._closed: 107 return 108 self._connection.send(command) 109 if name != 'CLOSE_TEST_CHANNEL': 110 print(self.receive_response().decode()) 111 112 def receive_response(self): 113 if self._closed: 114 return b'Closed' 115 size_chars = self._connection.receive(4) 116 size_bytes = bytearray(size_chars) 117 if not size_chars: 118 return b'No response, assuming that the connection is broken' 119 response_size = 0 120 for i in range(0, len(size_chars) - 1): 121 response_size |= (size_chars[i] << (8 * i)) 122 response = self._connection.receive(response_size) 123 return response 124 125 def lint_command(self, name, args, name_size, args_size): 126 assert name_size == len(name) and args_size == len(args) 127 try: 128 name.encode() 129 for arg in args: 130 arg.encode() 131 except UnicodeError: 132 print('Unrecognized characters.') 133 raise 134 if name_size > 255 or args_size > 255: 135 raise ValueError # Size must be encodable in one octet. 136 for arg in args: 137 if len(arg) > 255: 138 raise ValueError # Size must be encodable in one octet. 139 140 141class TestChannelShell(cmd.Cmd): 142 """Shell for sending test channel data to controller. 143 144 Manages the test channel to the controller and defines a set of commands the 145 user can send to the controller as well. These commands are processed parallel 146 to commands sent from the device stack and used to provide additional 147 debugging/testing capabilities. 148 149 Attributes: 150 test_channel: The communication channel to send data to the controller. 151 """ 152 153 def __init__(self, test_channel): 154 cmd.Cmd.__init__(self) 155 self._test_channel = test_channel 156 157 def do_add(self, args): 158 """Arguments: dev_type_str Add a new device of type dev_type_str. 159 160 """ 161 self._test_channel.send_command('add', args.split()) 162 163 def do_del(self, args): 164 """Arguments: device index Delete the device with the specified index. 165 166 """ 167 self._test_channel.send_command('del', args.split()) 168 169 def do_add_phy(self, args): 170 """Arguments: dev_type_str Add a new device of type dev_type_str. 171 172 """ 173 self._test_channel.send_command('add_phy', args.split()) 174 175 def do_del_phy(self, args): 176 """Arguments: phy index Delete the phy with the specified index. 177 178 """ 179 self._test_channel.send_command('del_phy', args.split()) 180 181 def do_add_device_to_phy(self, args): 182 """Arguments: device index phy index Add a new device of type dev_type_str. 183 184 """ 185 self._test_channel.send_command('add_device_to_phy', args.split()) 186 187 def do_del_device_from_phy(self, args): 188 """Arguments: phy index Delete the phy with the specified index. 189 190 """ 191 self._test_channel.send_command('del_device_from_phy', args.split()) 192 193 def do_add_remote(self, args): 194 """Arguments: dev_type_str Connect to a remote device at arg1@arg2. 195 196 """ 197 self._test_channel.send_command('add_remote', args.split()) 198 199 def do_get(self, args): 200 """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num. 201 202 """ 203 self._test_channel.send_command('get', args.split()) 204 205 def do_set(self, args): 206 """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val. 207 208 """ 209 self._test_channel.send_command('set', args.split()) 210 211 def do_set_device_address(self, args): 212 """Arguments: dev_num addr Set the address of device dev_num equal to addr. 213 214 """ 215 self._test_channel.send_command('set_device_address', args.split()) 216 217 def do_set_device_configuration(self, args): 218 """Arguments: dev_num config Set the controller properties of the selected device. 219 220 """ 221 self._test_channel.send_command('set_device_configuration', args.split()) 222 223 def do_list(self, args): 224 """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr. 225 226 """ 227 self._test_channel.send_command('list', args.split()) 228 229 def do_set_timer_period(self, args): 230 """Arguments: period_ms Set the timer to fire every period_ms milliseconds 231 """ 232 self._test_channel.send_command('set_timer_period', args.split()) 233 234 def do_start_timer(self, args): 235 """Arguments: None. Start the timer. 236 """ 237 self._test_channel.send_command('start_timer', args.split()) 238 239 def do_stop_timer(self, args): 240 """Arguments: None. Stop the timer. 241 """ 242 self._test_channel.send_command('stop_timer', args.split()) 243 244 def do_wait(self, args): 245 """Arguments: time in seconds (float). 246 """ 247 sleep_time = float(args.split()[0]) 248 time.sleep(sleep_time) 249 250 def do_reset(self, args): 251 """Arguments: None. 252 253 Resets the simulation. 254 """ 255 self._test_channel.send_command('reset', []) 256 257 def do_end(self, args): 258 """Arguments: None. 259 260 Ends the simulation and exits. 261 """ 262 self._test_channel.send_command('END_SIMULATION', []) 263 print('Goodbye.') 264 return True 265 266 def do_quit(self, args): 267 """Arguments: None. 268 269 Exits the test channel. 270 """ 271 self._test_channel.send_command('CLOSE_TEST_CHANNEL', []) 272 self._test_channel.close() 273 print('Goodbye.') 274 return True 275 276 def do_help(self, args): 277 """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. 278 279 """ 280 if (len(args) == 0): 281 cmd.Cmd.do_help(self, args) 282 else: 283 self._test_channel.send_command('help', args.split()) 284 285 def preloop(self): 286 """Clear out the buffer 287 288 """ 289 response = self._test_channel.receive_response() 290 291 #def postcmd(self, stop, line): 292 #""" 293 #Called after each command 294 #stop : whether we will stop after this command 295 #line : the previous input line 296 #Return True to stop, False to continue 297 #""" 298 #if stop: 299 #return True 300 #response = self._test_channel.receive_response() 301 #if not response: 302 #return True 303 #print response 304 #return False 305 306 307def main(argv): 308 if len(argv) != 2: 309 print('Usage: python test_channel.py [port]') 310 return 311 try: 312 port = int(argv[1]) 313 except ValueError: 314 print('Error parsing port.') 315 else: 316 try: 317 test_channel = TestChannel(port) 318 except socket.error as e: 319 print('Error connecting to socket: %s' % e) 320 except: 321 print('Error creating test channel (check argument).') 322 else: 323 test_channel_shell = TestChannelShell(test_channel) 324 test_channel_shell.prompt = '$ ' 325 test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.') 326 327 328if __name__ == '__main__': 329 main(sys.argv) 330