1#!/usr/bin/env python3 2############################################################################################################### 3# 4# Copyright (C) 2019 Motorola Mobility LLC 5# 6# Redistribution and use in source and binary forms, with or without modification, are permitted provided that 7# the following conditions are met: 8# 9# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the 10# following disclaimer. 11# 12# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and 13# the following disclaimer in the documentation and/or other materials provided with the distribution. 14# 15# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or 16# promote products derived from this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED 19# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A 20# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 21# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED 22# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 25# POSSIBILITY OF SUCH DAMAGE. 26# 27############################################################################################################### 28############################################################################################################### 29# 30# Bluetooth Virtual Sniffing for Android 31# 32# This script supports Bluetooth Virtual Sniffing via Live Import Feature of Frontline Bluetooth Sniffer(FTS) 33# 34# It extracts the HCI packets from Snoop logs and redirect it to FTS for live HCI sniffing. 35# 36# It uses liveimport.ini and LiveImportAPI.dll from FTS path to communicate with FTS sniffer software. 37# 38# It works on both Windows and Ubuntu. For Ubuntu, both FTS and Python should be installed on Wine. 39# 40# FTS_INI_PATH & FTS_DLL_PATH should be set to absolute path of liveimport.ini and LiveImportAPI.dll of FTS 41# 42# Example below - This may change per machine per FTS version in Windows vs Ubuntu (Wine), set accordingly. 43# For Windows 44# FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\' 45# FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\' 46# 47# For Ubuntu - FTS path recognized by Wine (not Ubuntu path) 48# FTS_INI_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\' 49# FTS_DLL_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\' 50# 51############################################################################################################### 52 53import os 54import platform 55import socket 56import struct 57import subprocess 58import sys 59import time 60if sys.version_info[0] >= 3: 61 import configparser 62else: 63 import ConfigParser as configparser 64 65from calendar import timegm 66from ctypes import byref, c_bool, c_longlong, CDLL 67from _ctypes import FreeLibrary 68from datetime import datetime 69 70# Update below to right path corresponding to your machine, FTS version and OS used. 71FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\' 72FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\' 73 74iniName = 'liveimport.ini' 75if (platform.architecture()[0] == '32bit'): 76 dllName = 'LiveImportAPI.dll' 77else: 78 dllName = 'LiveImportAPI_x64.dll' 79 80launchFtsCmd = '\"' + FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"' 81 82# Unix Epoch delta since 01/01/1970 83FILETIME_EPOCH_DELTA = 116444736000000000 84HUNDREDS_OF_NANOSECONDS = 10000000 85 86HOST = 'localhost' 87PORT = 8872 88SNOOP_ID = 16 89SNOOP_HDR = 24 90 91 92def get_file_time(): 93 """ 94 Obtain current time in file time format for display 95 """ 96 date_time = datetime.now() 97 file_time = FILETIME_EPOCH_DELTA + ( 98 timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS) 99 file_time = file_time + (date_time.microsecond * 10) 100 return file_time 101 102 103def get_connection_string(): 104 """ 105 Read ConnectionString from liveimport.ini 106 """ 107 config = configparser.ConfigParser() 108 config.read(FTS_INI_PATH + iniName) 109 try: 110 conn_str = config.get('General', 'ConnectionString') 111 except (configparser.NoSectionError, configparser.NoOptionError): 112 return None 113 114 return conn_str 115 116 117def get_configuration_string(): 118 """ 119 Read Configuration string from liveimport.ini 120 """ 121 config_str = '' 122 config = configparser.ConfigParser() 123 config.read(FTS_INI_PATH + iniName) 124 try: 125 config_items = config.items('Configuration') 126 except (configparser.NoSectionError, configparser.NoOptionError): 127 return None 128 129 if config_items is not None: 130 for item in config_items: 131 key, value = item 132 config_str += ("%s=%s\n" % (key, value)) 133 return config_str 134 else: 135 return None 136 137 138def check_live_import_connection(live_import): 139 """ 140 Launch FTS app in Virtual Sniffing Mode 141 Check if FTS App is ready to start receiving the data. 142 If not, wait until 1 min and exit if FTS didn't start. 143 """ 144 is_connection_running = c_bool() 145 count = 0 146 147 status = live_import.IsAppReady(byref(is_connection_running)) 148 if (is_connection_running.value == True): 149 print("FTS is already launched, Start capture if not already started") 150 return True 151 152 print("Launching FTS Virtual Sniffing") 153 try: 154 ftsProcess = subprocess.Popen((launchFtsCmd), stdout=subprocess.PIPE) 155 except: 156 print("Error in Launching FTS.. exiting") 157 return False 158 159 while (is_connection_running.value == False and count < 12): 160 status = live_import.IsAppReady(byref(is_connection_running)) 161 if (status < 0): 162 print("Live Import Internal Error %d" % (status)) 163 return False 164 if (is_connection_running.value == False): 165 print("Waiting for 5 sec.. Open FTS Virtual Sniffing") 166 time.sleep(5) 167 count += 1 168 if (is_connection_running.value == True): 169 print("FTS is ready to receive the data, Start capture now") 170 return True 171 else: 172 print("FTS Virtual Sniffing didn't start until 1 min.. exiting") 173 return False 174 175 176def init_live_import(conn_str, config_str): 177 """ 178 Load DLL and Initialize the LiveImport module for FTS. 179 """ 180 success = c_bool() 181 try: 182 live_import = CDLL(FTS_DLL_PATH + dllName) 183 except: 184 return None 185 186 if live_import is None: 187 print("Error: Path to LiveImportAPI.dll is incorrect.. exiting") 188 return None 189 190 print(dllName + " loaded successfully") 191 result = live_import.InitializeLiveImport( 192 conn_str.encode('ascii', 'ignore'), config_str.encode( 193 'ascii', 'ignore'), byref(success)) 194 if (result < 0): 195 print("Live Import Init failed") 196 return None 197 else: 198 print("Live Import Init success") 199 return live_import 200 201 202def release_live_import(live_import): 203 """ 204 Cleanup and exit live import module. 205 """ 206 if live_import is not None: 207 live_import.ReleaseLiveImport() 208 FreeLibrary(live_import._handle) 209 210 211def main(): 212 213 print("Bluetooth Virtual Sniffing for Fluoride") 214 connection_str = get_connection_string() 215 if connection_str is None: 216 print("Error: path to liveimport.ini is incorrect.. exiting") 217 exit(0) 218 219 configuration_str = get_configuration_string() 220 if configuration_str is None: 221 print("Error: path to liveimport.ini is incorrect.. exiting") 222 exit(0) 223 224 live_import = init_live_import(connection_str, configuration_str) 225 if live_import is None: 226 print("Error: Path to LiveImportAPI.dll is incorrect.. exiting") 227 exit(0) 228 229 if (check_live_import_connection(live_import) == False): 230 release_live_import(live_import) 231 exit(0) 232 233 # Wait until the forward socket is ready 234 print("Waiting until adb is ready") 235 os.system('adb wait-for-device forward tcp:8872 tcp:8872') 236 237 btsnoop_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 238 btsnoop_sock.connect((HOST, PORT)) 239 snoop_id = btsnoop_sock.recv(SNOOP_ID) 240 if not snoop_id.startswith(b"btsnoop"): 241 print("Error: Snoop ID wasn't received.. exiting") 242 release_live_import(live_import) 243 exit(0) 244 245 while True: 246 try: 247 snoop_hdr = btsnoop_sock.recv(SNOOP_HDR) 248 if snoop_hdr is not None: 249 try: 250 olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12]) 251 except struct.error: 252 print("Error: Invalid data", repr(snoop_hdr)) 253 continue 254 255 file_time = get_file_time() 256 timestamp = c_longlong(file_time) 257 258 snoop_data = b'' 259 while (len(snoop_data) < olen): 260 data_frag = btsnoop_sock.recv(olen - len(snoop_data)) 261 if data_frag is not None: 262 snoop_data += data_frag 263 264 print("Bytes received %d Olen %d ilen %d flags %d" % 265 (len(snoop_data), olen, ilen, flags)) 266 packet_type = struct.unpack(">B", snoop_data[0:1])[0] 267 if packet_type == 1: 268 drf = 1 269 isend = 0 270 elif packet_type == 2: 271 drf = 2 272 if (flags & 0x01): 273 isend = 1 274 else: 275 isend = 0 276 elif packet_type == 3: 277 drf = 4 278 if (flags & 0x01): 279 isend = 1 280 else: 281 isend = 0 282 elif packet_type == 4: 283 drf = 8 284 isend = 1 285 286 result = live_import.SendFrame(olen - 1, olen - 1, 287 snoop_data[1:olen], drf, isend, 288 timestamp) 289 if (result < 0): 290 print("Send frame failed") 291 except KeyboardInterrupt: 292 print("Cleanup and exit") 293 release_live_import(live_import) 294 btsnoop_sock.close() 295 exit(0) 296 297 298if __name__ == '__main__': 299 main() 300