1# Copyright 2020 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""bluetooth audio test dat for A2DP, AVRCP, and HFP."""
6
7import logging
8import os
9import subprocess
10
11import common
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.bin import utils
14
15
16DIST_FILES = 'gs://chromeos-localmirror/distfiles'
17DOWNLOAD_TIMEOUT = 90 # timeout for gsutil downloads
18DATA_DIR = '/tmp'
19
20
21VISQOL_TARBALL = os.path.join(DIST_FILES, 'visqol-binary.tar.gz')
22# Path to ViSQOL tarball in autotest server
23VISQOL_TARBALL_LOCAL_PATH = os.path.join(DATA_DIR,
24                                         os.path.split(VISQOL_TARBALL)[1])
25VISQOL_FOLDER = os.path.join(DATA_DIR, 'visqol')
26VISQOL_PATH = os.path.join(VISQOL_FOLDER, 'visqol')
27# There are several available models for VISQOL, since these VISQOL based tests
28# are primarily for voice quality, this model is more tuned for voice quality.
29# experimentally, the scores have been fairly similar to the default model
30# TODO b:169251326 terms below are set outside of this codebase
31# and should be updated when possible. ("master" -> "main")
32# 'libsvm_nu_svr_model.txt'. Details: github.com/google/visqol/tree/master/model
33VISQOL_SIMILARITY_MODEL = os.path.join(
34        VISQOL_FOLDER, 'visqol.runfiles', '__main__', 'model',
35        'tcdvoip_nu.568_c5.31474325639_g3.17773760038_model.txt')
36VISQOL_TEST_DIR = os.path.join(VISQOL_FOLDER, 'bt-test-output')
37
38
39AUDIO_TARBALL = os.path.join(DIST_FILES, 'chameleon-bundle',
40                             'audio-test-data.tar.gz')
41AUDIO_TEST_DIR = '/usr/local/autotest/cros/audio/test_data'
42AUDIO_RECORD_DIR = os.path.join(DATA_DIR, 'audio')
43
44# AUDIO_TARBALL_NAME is the name of the tarball, i.e. audio-test-data.tar.gz
45AUDIO_TARBALL_NAME = os.path.split(AUDIO_TARBALL)[1]
46# AUDIO_TEST_DATA_DIR is the path of the audio-test-data directory,
47# i.e. /tmp/audio-test-data/
48AUDIO_TEST_DATA_DIR = os.path.join(DATA_DIR,
49                                   AUDIO_TARBALL_NAME.split('.', 1)[0])
50AUDIO_DATA_TARBALL_PATH = os.path.join(DATA_DIR, AUDIO_TARBALL_NAME)
51
52
53A2DP = 'a2dp'
54A2DP_LONG = 'a2dp_long'
55AVRCP = 'avrcp'
56HFP_NBS = 'hfp_nbs'
57HFP_WBS = 'hfp_wbs'
58VISQOL_BUFFER_LENGTH = 10.0
59
60
61common_test_data = {
62    'bit_width': 16,
63    'format': 'S16_LE',
64    'duration': 5,
65}
66
67
68def download_file_from_bucket(dir, file_address, verify_download):
69    """Extract tarball specified by tar_path to directory dir.
70
71    @param dir: Path to directory to download file to.
72    @param file_address: URL of the file to download.
73    @param verify_download: A function that accepts stdout, stderr, and the
74            process as args and verifies if the download succeeded.
75
76    @retuns: The result of a call to verify_download.
77    """
78    download_cmd = 'gsutil cp -r {0} {1}'.format(file_address, dir)
79    download_proc = subprocess.Popen(download_cmd.split(),
80                                     stdout=subprocess.PIPE,
81                                     stderr=subprocess.PIPE)
82
83    try:
84        stdout, stderr = utils.poll_for_condition(
85                download_proc.communicate,
86                error.TestError('Failed to download'), timeout=DOWNLOAD_TIMEOUT,
87                desc='Downloading {}'.format(file_address))
88    except Exception as e:
89        download_proc.terminate()
90        return False
91    else:
92        return verify_download(stdout, stderr, download_proc)
93
94
95def extract_tarball(dir, tar_path, verify_extraction):
96    """Extract tarball specified by tar_path to directory dir.
97
98    @param dir: Path to directory to extract to.
99    @param tar_path: Path to the tarball to extract.
100    @param verify_extraction: A function that accepts stdout, stderr, and the
101            process as args and verifies if the extraction succeeded.
102
103    @retuns: The result of a call to verify_extraction.
104    """
105    extract_cmd = 'tar -xf {0} -C {1}'.format(tar_path, dir)
106    extract_proc = subprocess.Popen(extract_cmd.split(), stdout=subprocess.PIPE,
107                                    stderr=subprocess.PIPE)
108
109    try:
110        stdout, stderr = utils.poll_for_condition(
111                extract_proc.communicate, error.TestError('Failed to extract'),
112                timeout=DOWNLOAD_TIMEOUT, desc='Extracting {}'.format(tar_path))
113    except Exception as e:
114        extract_proc.terminate()
115        return False
116    else:
117        return verify_extraction(stdout, stderr, extract_proc)
118
119
120def verify_visqol_extraction(stdout, stderr, process):
121    """Verify all important components of VISQOL are present in expected
122    locations.
123
124    @param stdout: Output of the extract process.
125    @param stderr: Error output of the extract process.
126    @param process: The Popen object of the extract process.
127
128    @returns: True if all required components are present and extraction process
129            suceeded.
130    """
131    return (not stderr and
132            os.path.isdir(VISQOL_FOLDER) and
133            os.path.isdir(VISQOL_TEST_DIR) and
134            os.path.exists(VISQOL_PATH) and
135            os.path.exists(VISQOL_SIMILARITY_MODEL))
136
137
138def get_visqol_binary():
139    """Download visqol binary.
140
141    If visqol binary not already available, download from DIST_FILES, otherwise
142    skip this step.
143    """
144    logging.debug('Downloading ViSQOL binary on autotest server')
145    if verify_visqol_extraction(None, None, None):
146        logging.debug('VISQOL binary already exists, skipping')
147        return
148
149    # download from VISQOL_TARBALL
150    if not download_file_from_bucket(DATA_DIR, VISQOL_TARBALL,
151                                     lambda _, __, p: p.returncode == 0):
152        raise error.TestError('Failed to download ViSQOL binary')
153    # Extract tarball tp DATA_DIR
154    if not extract_tarball(DATA_DIR, VISQOL_TARBALL_LOCAL_PATH,
155                           verify_visqol_extraction):
156        raise error.TestError('Failed to extract ViSQOL binary')
157
158
159def get_audio_test_data():
160    """Download audio test data files
161
162    Download and unzip audio files for audio tests from DIST_FILES.
163    """
164    logging.debug('Downloading audio test data on autotest server')
165
166    # download from AUDIO_TARBALL
167    if not download_file_from_bucket(DATA_DIR, AUDIO_TARBALL,
168                                     lambda _, __, p: p.returncode == 0):
169        raise error.TestError('Failed to download audio test data')
170    # Extract tarball to DATA_DIR
171    if not extract_tarball(
172            DATA_DIR, AUDIO_DATA_TARBALL_PATH,
173            lambda _, __, ___: os.path.isdir(AUDIO_TEST_DATA_DIR)):
174        raise error.TestError('Failed to extract audio test data')
175
176
177# Audio test data for hfp narrow band speech
178hfp_nbs_test_data = {
179    'rate': 8000,
180    'channels': 1,
181    'frequencies': (3500,),
182    'file': os.path.join(AUDIO_TEST_DIR,
183                         'sine_3500hz_rate8000_ch1_5secs.raw'),
184    'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
185                                     'hfp_nbs_recorded_by_peer.wav'),
186    'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
187                                    'hfp_nbs_recorded_by_dut.raw'),
188    'visqol_test_files': [
189        {
190            'file': os.path.join(AUDIO_TEST_DATA_DIR,
191                                 'voice_8k.wav'),
192            'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
193                                             'voice_8k_deg_peer.wav'),
194            'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
195                                            'voice_8k_deg_dut.raw'),
196            'channels': 1,
197            'rate': 8000,
198            'duration': 26.112 + VISQOL_BUFFER_LENGTH,
199            'bit_width': 16,
200            'format': 'S16_LE',
201            # convenient way to differentiate ViSQOL tests from regular tests
202            'visqol_test': True,
203            'encoding': 'signed-integer',
204            'speech_mode': True,
205            # Passing scored are determined mostly experimentally, the DUT as
206            # sink direction has issues and so for now the score set low.
207            # Ideally both scores should be set to >= 4.0 in fully functioning
208            # scenario.
209            'sink_passing_score': 0.0,
210            'source_passing_score': 4.0,
211        },
212        {
213            'file': os.path.join(AUDIO_TEST_DATA_DIR,
214                                 'sine_3500hz_rate8000_ch1_5secs.wav'),
215            'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
216                                             'sine_3k_deg_peer.wav'),
217            'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
218                                            'sine_3k_deg_dut.raw'),
219            'channels': 1,
220            'rate': 8000,
221            'duration': 5.0 + VISQOL_BUFFER_LENGTH,
222            'bit_width': 16,
223            'format': 'S16_LE',
224            # convenient way to differentiate ViSQOL tests from regular tests
225            'visqol_test': True,
226            'encoding': 'signed-integer',
227            'speech_mode': True,
228            # Sine tones don't work very well with ViSQOL on the NBS tests, both
229            # directions score fairly low, however I've kept it in as a test
230            # file because its a good for reference, makes it easy to see
231            # degradation and verify that this is transmitting the frequency
232            # range we would expect
233            'sink_passing_score': 1.0,
234            'source_passing_score': 2.0,
235        }
236    ]
237}
238hfp_nbs_test_data.update(common_test_data)
239
240
241# Audio test data for hfp wide band speech
242hfp_wbs_test_data = {
243    'rate': 16000,
244    'channels': 1,
245
246    'frequencies': (7000,),
247    'file': os.path.join(AUDIO_TEST_DIR,
248                         'sine_7000hz_rate16000_ch1_5secs.raw'),
249    'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
250                                     'hfp_wbs_recorded_by_peer.wav'),
251    'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
252                                    'hfp_wbs_recorded_by_dut.raw'),
253    'visqol_test_files': [
254        {
255            'file': os.path.join(AUDIO_TEST_DATA_DIR,
256                                 'voice.wav'),
257            'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
258                                             'voice_deg_peer.wav'),
259            'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
260                                            'voice_deg_dut.raw'),
261            'channels': 1,
262            'rate': 16000,
263            'duration': 26.112 + VISQOL_BUFFER_LENGTH,
264            'bit_width': 16,
265            'format': 'S16_LE',
266            # convenient way to differentiate ViSQOL tests from regular tests
267            'visqol_test': True,
268            'encoding': 'signed-integer',
269            'speech_mode': True,
270            # Passing scored are determined mostly experimentally, the DUT as
271            # sink direction has issues and so for now the score set low.
272            # Ideally both scores should be set to >= 4.0 in fully functioning
273            # scenario.
274            'sink_passing_score': 0.0,
275            'source_passing_score': 4.0,
276        },
277        {
278            'file': os.path.join(AUDIO_TEST_DATA_DIR,
279                                 'sine_7000hz_rate16000_ch1_5secs.wav'),
280            'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
281                                             'sine_7k_deg_peer.wav'),
282            'recorded_by_dut': os.path.join(AUDIO_RECORD_DIR,
283                                            'sine_7k_deg_dut.raw'),
284            'channels': 1,
285            'rate': 16000,
286            'duration': 5.0 + VISQOL_BUFFER_LENGTH,
287            'bit_width': 16,
288            'format': 'S16_LE',
289            # convenient way to differentiate ViSQOL tests from regular tests
290            'visqol_test': True,
291            'encoding': 'signed-integer',
292            'speech_mode': True,
293            # Passing scored are determined mostly experimentally, the DUT as
294            # sink direction has issues and so for now the score set low.
295            # Ideally both scores should be set to >= 4.0 in fully functioning
296            # scenario.
297            'sink_passing_score': 0.0,
298            'source_passing_score': 4.0,
299        }
300    ]
301}
302hfp_wbs_test_data.update(common_test_data)
303
304
305# Audio test data for a2dp
306a2dp_test_data = {
307    'rate': 48000,
308    'channels': 2,
309    'frequencies': (440, 20000),
310    'file': os.path.join(AUDIO_TEST_DIR,
311                         'binaural_sine_440hz_20000hz_rate48000_%dsecs.raw'),
312    'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
313                                     'a2dp_recorded_by_peer.raw'),
314    'chunk_in_secs': 5,
315}
316a2dp_test_data.update(common_test_data)
317
318
319# Audio test data for a2dp long test. The file and duration attributes
320# are dynamic and will be determined during run time.
321a2dp_long_test_data = a2dp_test_data.copy()
322a2dp_long_test_data.update({
323    'recorded_by_peer': os.path.join(AUDIO_RECORD_DIR,
324                                     'a2dp_long_recorded_by_peer.raw'),
325    'duration': 0,       # determined at run time
326    'chunk_in_secs': 1,
327})
328
329
330audio_test_data = {
331    A2DP: a2dp_test_data,
332    A2DP_LONG: a2dp_long_test_data,
333    HFP_WBS: hfp_wbs_test_data,
334    HFP_NBS: hfp_nbs_test_data,
335}
336