1# -*- coding: utf-8 -*- 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5""" 6A script to extract raw SCO RX packets from btsnoop. 7Use 'btmon -S' to dump SCO traffic from btsnoop file. 8Trim the btsnoop output to just the SCO traffic period. 9Then execute 'python parse-sco.py <btsnoop-output>' 10""" 11 12import atexit 13import binascii 14import os 15import re 16import sys 17 18 19class SCOParser: 20 """ 21 Parser for grepping SCO packets 22 """ 23 24 def __init__(self): 25 # On old releases, +CIEV: 4,1 indicates the start point of call session 26 # c 31 0d 0a 9a ..+CIEV: 4,1.. 27 self.call_start_re = re.compile(r'.*?\+CIEV:\s4,(\d).*?') 28 29 # > SCO Data RX: Handle 257 flags 0x00 dlen 60 #13826 [hci0] 650.388305 30 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ 31 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ 32 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ 33 # 00 00 00 00 00 00 00 00 00 00 00 00 34 self.sco_rx_re = re.compile(r'.*?SCO\sData\sRX.*?flags\s0x(\d+).*?') 35 self.sco_f = None 36 self.output_idx = 0 37 self.pk_count = 0 38 self.pl_count = 0 39 40 atexit.register(self._cleanup) 41 42 def _cleanup(self): 43 if self.sco_f is not None: 44 print( 45 "Current file contains %d packets (%d with erroneous status flag)" % 46 (self.pk_count, self.pl_count)) 47 self.pk_count = 0 48 self.pl_count = 0 49 self.sco_f.close() 50 51 def _new_session(self): 52 if self.sco_f is not None: 53 close(self.sco_f) 54 55 new_file = "sco_file_%d" % self.output_idx 56 print("Record to %s" % new_file) 57 self.sco_f = open(new_file, 'wb') 58 self.output_idx += 1 59 60 return self.sco_f 61 62 def parse(self, filename): 63 if not os.path.exists(filename): 64 print("%s doesn't exist" % filename) 65 return 66 67 print("Start parsing %s" % filename) 68 parse_rx_data = 0 69 with open(filename, "r") as f: 70 for line in f.readlines(): 71 if parse_rx_data > 0: 72 self.sco_f.write(binascii.unhexlify(''.join(line[:56].split()))) 73 parse_rx_data = (parse_rx_data + 1) % 5 74 75 # Start a new session and output following SCO data to a new file 76 match = self.call_start_re.search(line) 77 if match and (1 == int(match.group(1))): 78 self._new_session() 79 continue 80 81 match = self.sco_rx_re.search(line) 82 if match: 83 if self.sco_f is None: 84 self._new_session() 85 86 self.pk_count += 1 87 88 status_flag = int(match.group(1)) 89 hdr = ['01', str(status_flag) + '1', '3c'] 90 if status_flag != 0: 91 self.pl_count += 1 92 93 self.sco_f.write(binascii.unhexlify(''.join(hdr))) 94 parse_rx_data = 1 95 96 97def main(argv): 98 if len(argv) < 1: 99 print("parse_sco.py [btsnoop.txt]") 100 return 101 102 p = SCOParser() 103 p.parse(argv[0]) 104 105 106if __name__ == "__main__": 107 main(sys.argv[1:]) 108