1# -*- coding: utf-8 -*-
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""
6A script to extract raw SCO RX packets from btsnoop.
7Use 'btmon -S' to dump SCO traffic from btsnoop file.
8Trim the btsnoop output to just the SCO traffic period.
9Then execute 'python parse-sco.py <btsnoop-output>'
10"""
11
12import atexit
13import binascii
14import os
15import re
16import sys
17
18
19class SCOParser:
20  """
21  Parser for grepping SCO packets
22  """
23
24  def __init__(self):
25    # On old releases, +CIEV: 4,1 indicates the start point of call session
26    # c 31 0d 0a 9a     ..+CIEV: 4,1..
27    self.call_start_re = re.compile(r'.*?\+CIEV:\s4,(\d).*?')
28
29    # > SCO Data RX: Handle 257 flags 0x00 dlen 60           #13826 [hci0] 650.388305
30    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
31    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
32    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
33    #         00 00 00 00 00 00 00 00 00 00 00 00
34    self.sco_rx_re = re.compile(r'.*?SCO\sData\sRX.*?flags\s0x(\d+).*?')
35    self.sco_f = None
36    self.output_idx = 0
37    self.pk_count = 0
38    self.pl_count = 0
39
40    atexit.register(self._cleanup)
41
42  def _cleanup(self):
43    if self.sco_f is not None:
44      print(
45          "Current file contains %d packets (%d with erroneous status flag)" %
46          (self.pk_count, self.pl_count))
47      self.pk_count = 0
48      self.pl_count = 0
49      self.sco_f.close()
50
51  def _new_session(self):
52    if self.sco_f is not None:
53      close(self.sco_f)
54
55    new_file = "sco_file_%d" % self.output_idx
56    print("Record to %s" % new_file)
57    self.sco_f = open(new_file, 'wb')
58    self.output_idx += 1
59
60    return self.sco_f
61
62  def parse(self, filename):
63    if not os.path.exists(filename):
64      print("%s doesn't exist" % filename)
65      return
66
67    print("Start parsing %s" % filename)
68    parse_rx_data = 0
69    with open(filename, "r") as f:
70      for line in f.readlines():
71        if parse_rx_data > 0:
72          self.sco_f.write(binascii.unhexlify(''.join(line[:56].split())))
73          parse_rx_data = (parse_rx_data + 1) % 5
74
75        # Start a new session and output following SCO data to a new file
76        match = self.call_start_re.search(line)
77        if match and (1 == int(match.group(1))):
78          self._new_session()
79          continue
80
81        match = self.sco_rx_re.search(line)
82        if match:
83          if self.sco_f is None:
84            self._new_session()
85
86          self.pk_count += 1
87
88          status_flag = int(match.group(1))
89          hdr = ['01', str(status_flag) + '1', '3c']
90          if status_flag != 0:
91            self.pl_count += 1
92
93          self.sco_f.write(binascii.unhexlify(''.join(hdr)))
94          parse_rx_data = 1
95
96
97def main(argv):
98  if len(argv) < 1:
99    print("parse_sco.py [btsnoop.txt]")
100    return
101
102  p = SCOParser()
103  p.parse(argv[0])
104
105
106if __name__ == "__main__":
107  main(sys.argv[1:])
108