1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending testing parameters and commands to a Bluetooth device.
17
18This script provides a simple shell interface for sending data at run-time to a
19Bluetooth device. It is intended to be used in tandem with the test vendor
20library project.
21
22Usage:
23  Option A: Script
24    1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
25    port to use for the test channel.
26  Option B: Manual
27    1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
28    tcp:<port>' to forward the port to the device.
29    2. In a separate shell, build and push the test vendor library to the device
30    using the script mentioned in option A (i.e. without the --test-channel flag
31    set).
32    3. Once logcat has started, turn Bluetooth on from the device.
33    4. Run this program, in the shell from step 1,  the port, also from step 1,
34    as arguments.
35"""
36
37#!/usr/bin/env python3
38
39import cmd
40import random
41import socket
42import string
43import struct
44import sys
45import time
46
47DEVICE_NAME_LENGTH = 6
48DEVICE_ADDRESS_LENGTH = 6
49
50
51# Used to generate fake device names and addresses during discovery.
52def generate_random_name():
53    return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
54      string.digits) for _ in range(DEVICE_NAME_LENGTH))
55
56
57def generate_random_address():
58    return ''.join(random.SystemRandom().choice(string.digits) for _ in \
59      range(DEVICE_ADDRESS_LENGTH))
60
61
62class Connection(object):
63    """Simple wrapper class for a socket object.
64
65  Attributes:
66    socket: The underlying socket created for the specified address and port.
67  """
68
69    def __init__(self, port):
70        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71        self._socket.connect(('localhost', port))
72
73    def close(self):
74        self._socket.close()
75
76    def send(self, data):
77        self._socket.sendall(data.encode())
78
79    def receive(self, size):
80        return self._socket.recv(size)
81
82
83class TestChannel(object):
84    """Checks outgoing commands and sends them once verified.
85
86  Attributes:
87    connection: The connection to the test vendor library that commands are sent
88      on.
89  """
90
91    def __init__(self, port):
92        self._connection = Connection(port)
93        self._closed = False
94
95    def close(self):
96        self._connection.close()
97        self._closed = True
98
99    def send_command(self, name, args):
100        name_size = len(name)
101        args_size = len(args)
102        self.lint_command(name, args, name_size, args_size)
103        encoded_name = chr(name_size) + name
104        encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
105        command = encoded_name + encoded_args
106        if self._closed:
107            return
108        self._connection.send(command)
109        if name != 'CLOSE_TEST_CHANNEL':
110            print(self.receive_response().decode())
111
112    def receive_response(self):
113        if self._closed:
114            return b'Closed'
115        size_chars = self._connection.receive(4)
116        size_bytes = bytearray(size_chars)
117        if not size_chars:
118            return b'No response, assuming that the connection is broken'
119        response_size = 0
120        for i in range(0, len(size_chars) - 1):
121            response_size |= (size_chars[i] << (8 * i))
122        response = self._connection.receive(response_size)
123        return response
124
125    def lint_command(self, name, args, name_size, args_size):
126        assert name_size == len(name) and args_size == len(args)
127        try:
128            name.encode()
129            for arg in args:
130                arg.encode()
131        except UnicodeError:
132            print('Unrecognized characters.')
133            raise
134        if name_size > 255 or args_size > 255:
135            raise ValueError  # Size must be encodable in one octet.
136        for arg in args:
137            if len(arg) > 255:
138                raise ValueError  # Size must be encodable in one octet.
139
140
141class TestChannelShell(cmd.Cmd):
142    """Shell for sending test channel data to controller.
143
144  Manages the test channel to the controller and defines a set of commands the
145  user can send to the controller as well. These commands are processed parallel
146  to commands sent from the device stack and used to provide additional
147  debugging/testing capabilities.
148
149  Attributes:
150    test_channel: The communication channel to send data to the controller.
151  """
152
153    def __init__(self, test_channel):
154        cmd.Cmd.__init__(self)
155        self._test_channel = test_channel
156
157    def do_add(self, args):
158        """Arguments: dev_type_str Add a new device of type dev_type_str.
159
160    """
161        self._test_channel.send_command('add', args.split())
162
163    def do_del(self, args):
164        """Arguments: device index Delete the device with the specified index.
165
166    """
167        self._test_channel.send_command('del', args.split())
168
169    def do_add_phy(self, args):
170        """Arguments: dev_type_str Add a new device of type dev_type_str.
171
172    """
173        self._test_channel.send_command('add_phy', args.split())
174
175    def do_del_phy(self, args):
176        """Arguments: phy index Delete the phy with the specified index.
177
178    """
179        self._test_channel.send_command('del_phy', args.split())
180
181    def do_add_device_to_phy(self, args):
182        """Arguments: device index phy index Add a new device of type dev_type_str.
183
184    """
185        self._test_channel.send_command('add_device_to_phy', args.split())
186
187    def do_del_device_from_phy(self, args):
188        """Arguments: phy index Delete the phy with the specified index.
189
190    """
191        self._test_channel.send_command('del_device_from_phy', args.split())
192
193    def do_add_remote(self, args):
194        """Arguments: dev_type_str Connect to a remote device at arg1@arg2.
195
196    """
197        self._test_channel.send_command('add_remote', args.split())
198
199    def do_get(self, args):
200        """Arguments: dev_num attr_str Get the value of the attribute attr_str from device dev_num.
201
202    """
203        self._test_channel.send_command('get', args.split())
204
205    def do_set(self, args):
206        """Arguments: dev_num attr_str val Set the value of the attribute attr_str from device dev_num equal to val.
207
208    """
209        self._test_channel.send_command('set', args.split())
210
211    def do_set_device_address(self, args):
212        """Arguments: dev_num addr Set the address of device dev_num equal to addr.
213
214    """
215        self._test_channel.send_command('set_device_address', args.split())
216
217    def do_set_device_configuration(self, args):
218        """Arguments: dev_num config Set the controller properties of the selected device.
219
220    """
221        self._test_channel.send_command('set_device_configuration', args.split())
222
223    def do_list(self, args):
224        """Arguments: [dev_num [attr]] List the devices from the controller, optionally filtered by device and attr.
225
226    """
227        self._test_channel.send_command('list', args.split())
228
229    def do_set_timer_period(self, args):
230        """Arguments: period_ms Set the timer to fire every period_ms milliseconds
231    """
232        self._test_channel.send_command('set_timer_period', args.split())
233
234    def do_start_timer(self, args):
235        """Arguments: None. Start the timer.
236    """
237        self._test_channel.send_command('start_timer', args.split())
238
239    def do_stop_timer(self, args):
240        """Arguments: None. Stop the timer.
241    """
242        self._test_channel.send_command('stop_timer', args.split())
243
244    def do_wait(self, args):
245        """Arguments: time in seconds (float).
246    """
247        sleep_time = float(args.split()[0])
248        time.sleep(sleep_time)
249
250    def do_reset(self, args):
251        """Arguments: None.
252
253    Resets the simulation.
254    """
255        self._test_channel.send_command('reset', [])
256
257    def do_end(self, args):
258        """Arguments: None.
259
260    Ends the simulation and exits.
261    """
262        self._test_channel.send_command('END_SIMULATION', [])
263        print('Goodbye.')
264        return True
265
266    def do_quit(self, args):
267        """Arguments: None.
268
269    Exits the test channel.
270    """
271        self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
272        self._test_channel.close()
273        print('Goodbye.')
274        return True
275
276    def do_help(self, args):
277        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
278
279    """
280        if (len(args) == 0):
281            cmd.Cmd.do_help(self, args)
282        else:
283            self._test_channel.send_command('help', args.split())
284
285    def preloop(self):
286        """Clear out the buffer
287
288    """
289        response = self._test_channel.receive_response()
290
291    #def postcmd(self, stop, line):
292    #"""
293    #Called after each command
294    #stop : whether we will stop after this command
295    #line : the previous input line
296    #Return True to stop, False to continue
297    #"""
298    #if stop:
299    #return True
300    #response = self._test_channel.receive_response()
301    #if not response:
302    #return True
303    #print response
304    #return False
305
306
307def main(argv):
308    if len(argv) != 2:
309        print('Usage: python test_channel.py [port]')
310        return
311    try:
312        port = int(argv[1])
313    except ValueError:
314        print('Error parsing port.')
315    else:
316        try:
317            test_channel = TestChannel(port)
318        except socket.error as e:
319            print('Error connecting to socket: %s' % e)
320        except:
321            print('Error creating test channel (check argument).')
322        else:
323            test_channel_shell = TestChannelShell(test_channel)
324            test_channel_shell.prompt = '$ '
325            test_channel_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
326
327
328if __name__ == '__main__':
329    main(sys.argv)
330