# -*- coding: utf-8 -*- # Copyright 2020 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """ A script to extract raw SCO RX packets from btsnoop. Use 'btmon -S' to dump SCO traffic from btsnoop file. Trim the btsnoop output to just the SCO traffic period. Then execute 'python parse-sco.py ' """ import atexit import binascii import os import re import sys class SCOParser: """ Parser for grepping SCO packets """ def __init__(self): # On old releases, +CIEV: 4,1 indicates the start point of call session # c 31 0d 0a 9a ..+CIEV: 4,1.. self.call_start_re = re.compile(r'.*?\+CIEV:\s4,(\d).*?') # > SCO Data RX: Handle 257 flags 0x00 dlen 60 #13826 [hci0] 650.388305 # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ # 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ # 00 00 00 00 00 00 00 00 00 00 00 00 self.sco_rx_re = re.compile(r'.*?SCO\sData\sRX.*?flags\s0x(\d+).*?') self.sco_f = None self.output_idx = 0 self.pk_count = 0 self.pl_count = 0 atexit.register(self._cleanup) def _cleanup(self): if self.sco_f is not None: print( "Current file contains %d packets (%d with erroneous status flag)" % (self.pk_count, self.pl_count)) self.pk_count = 0 self.pl_count = 0 self.sco_f.close() def _new_session(self): if self.sco_f is not None: close(self.sco_f) new_file = "sco_file_%d" % self.output_idx print("Record to %s" % new_file) self.sco_f = open(new_file, 'wb') self.output_idx += 1 return self.sco_f def parse(self, filename): if not os.path.exists(filename): print("%s doesn't exist" % filename) return print("Start parsing %s" % filename) parse_rx_data = 0 with open(filename, "r") as f: for line in f.readlines(): if parse_rx_data > 0: self.sco_f.write(binascii.unhexlify(''.join(line[:56].split()))) parse_rx_data = (parse_rx_data + 1) % 5 # Start a new session and output following SCO data to a new file match = self.call_start_re.search(line) if match and (1 == int(match.group(1))): self._new_session() continue match = self.sco_rx_re.search(line) if match: if self.sco_f is None: self._new_session() self.pk_count += 1 status_flag = int(match.group(1)) hdr = ['01', str(status_flag) + '1', '3c'] if status_flag != 0: self.pl_count += 1 self.sco_f.write(binascii.unhexlify(''.join(hdr))) parse_rx_data = 1 def main(argv): if len(argv) < 1: print("parse_sco.py [btsnoop.txt]") return p = SCOParser() p.parse(argv[0]) if __name__ == "__main__": main(sys.argv[1:])