1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides the test utilities for audio tests using chameleon.""" 6 7# TODO (cychiang) Move test utilities from chameleon_audio_helpers 8# to this module. 9 10import logging 11import multiprocessing 12import os 13import time 14from contextlib import contextmanager 15 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.cros import constants 18from autotest_lib.client.cros.audio import audio_analysis 19from autotest_lib.client.cros.audio import audio_data 20 21def check_audio_nodes(audio_facade, audio_nodes): 22 """Checks the node selected by Cros device is correct. 23 24 @param audio_facade: A RemoteAudioFacade to access audio functions on 25 Cros device. 26 27 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 28 expected selected output and input nodes. 29 30 @raises: error.TestFail if the nodes selected by Cros device are not expected. 31 32 """ 33 curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types() 34 out_audio_nodes, in_audio_nodes = audio_nodes 35 if (in_audio_nodes != None and 36 sorted(curr_in_nodes) != sorted(in_audio_nodes)): 37 raise error.TestFail('Wrong input node(s) selected %s ' 38 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes))) 39 if (out_audio_nodes != None and 40 sorted(curr_out_nodes) != sorted(out_audio_nodes)): 41 raise error.TestFail('Wrong output node(s) selected %s ' 42 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes))) 43 44 45def check_plugged_nodes(audio_facade, audio_nodes): 46 """Checks the nodes that are currently plugged on Cros device are correct. 47 48 @param audio_facade: A RemoteAudioFacade to access audio functions on 49 Cros device. 50 51 @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing 52 expected plugged output and input nodes. 53 54 @raises: error.TestFail if the plugged nodes on Cros device are not expected. 55 56 """ 57 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 58 out_audio_nodes, in_audio_nodes = audio_nodes 59 if (in_audio_nodes != None and 60 sorted(curr_in_nodes) != sorted(in_audio_nodes)): 61 raise error.TestFail('Wrong input node(s) plugged %s ' 62 'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes))) 63 if (out_audio_nodes != None and 64 sorted(curr_out_nodes) != sorted(out_audio_nodes)): 65 raise error.TestFail('Wrong output node(s) plugged %s ' 66 'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes))) 67 68 69def bluetooth_nodes_plugged(audio_facade): 70 """Checks bluetooth nodes are plugged. 71 72 @param audio_facade: A RemoteAudioFacade to access audio functions on 73 Cros device. 74 75 @raises: error.TestFail if either input or output bluetooth node is 76 not plugged. 77 78 """ 79 curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types() 80 return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes 81 82 83def _get_board_name(host): 84 """Gets the board name. 85 86 @param host: The CrosHost object. 87 88 @returns: The board name. 89 90 """ 91 return host.get_board().split(':')[1] 92 93 94def correction_plug_unplug_for_audio(host, port): 95 """Plugs/unplugs several times for Cros device to detect audio. 96 97 For issue crbug.com/450101, Exynos HDMI driver has problem recognizing 98 HDMI audio, while display can be detected. Do several plug/unplug and wait 99 as a workaround. Note that port will be in unplugged state in the end if 100 extra plug/unplug is needed. 101 102 @param host: A CrosHost object. 103 @param port: A ChameleonVideoInput object. 104 105 """ 106 board = _get_board_name(host) 107 if board in ['peach_pit', 'peach_pi', 'daisy', 'daisy_spring', 108 'daisy_skate']: 109 logging.info('Need extra plug/unplug on board %s', board) 110 for _ in xrange(3): 111 port.plug() 112 time.sleep(3) 113 port.unplug() 114 time.sleep(3) 115 116 117def has_internal_speaker(host): 118 """Checks if the Cros device has speaker. 119 120 @param host: The CrosHost object. 121 122 @returns: True if Cros device has internal speaker. False otherwise. 123 124 """ 125 board_name = _get_board_name(host) 126 if host.get_board_type() == 'CHROMEBOX' and board_name != 'stumpy': 127 logging.info('Board %s does not have speaker.', board_name) 128 return False 129 return True 130 131 132def has_internal_microphone(host): 133 """Checks if the Cros device has internal microphone. 134 135 @param host: The CrosHost object. 136 137 @returns: True if Cros device has internal microphone. False otherwise. 138 139 """ 140 board_name = _get_board_name(host) 141 if host.get_board_type() == 'CHROMEBOX': 142 logging.info('Board %s does not have internal microphone.', board_name) 143 return False 144 return True 145 146 147def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50): 148 """Performs the suspend/resume on Cros device. 149 150 @param suspend_time_secs: Time in seconds to let Cros device suspend. 151 @resume_network_timeout_secs: Time in seconds to let Cros device resume and 152 obtain network. 153 """ 154 def action_suspend(): 155 """Calls the host method suspend.""" 156 host.suspend(suspend_time=suspend_time_secs) 157 158 boot_id = host.get_boot_id() 159 proc = multiprocessing.Process(target=action_suspend) 160 logging.info("Suspending...") 161 proc.daemon = True 162 proc.start() 163 host.test_wait_for_sleep(suspend_time_secs / 3) 164 logging.info("DUT suspended! Waiting to resume...") 165 host.test_wait_for_resume( 166 boot_id, suspend_time_secs + resume_network_timeout_secs) 167 logging.info("DUT resumed!") 168 169 170def dump_cros_audio_logs(host, audio_facade, directory, suffix=''): 171 """Dumps logs for audio debugging from Cros device. 172 173 @param host: The CrosHost object. 174 @param audio_facade: A RemoteAudioFacade to access audio functions on 175 Cros device. 176 @directory: The directory to dump logs. 177 178 """ 179 def get_file_path(name): 180 """Gets file path to dump logs. 181 182 @param name: The file name. 183 184 @returns: The file path with an optional suffix. 185 186 """ 187 file_name = '%s.%s' % (name, suffix) if suffix else name 188 file_path = os.path.join(directory, file_name) 189 return file_path 190 191 audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt')) 192 193 host.get_file('/var/log/messages', get_file_path('messages')) 194 195 host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE, 196 get_file_path('multimedia_xmlrpc_server.log')) 197 198 199@contextmanager 200def monitor_no_nodes_changed(audio_facade, callback=None): 201 """Context manager to monitor nodes changed signal on Cros device. 202 203 Starts the counter in the beginning. Stops the counter in the end to make 204 sure there is no NodesChanged signal during the try block. 205 206 E.g. with monitor_no_nodes_changed(audio_facade): 207 do something on playback/recording 208 209 @param audio_facade: A RemoteAudioFacade to access audio functions on 210 Cros device. 211 @param fail_callback: The callback to call before raising TestFail 212 when there is unexpected NodesChanged signals. 213 214 @raises: error.TestFail if there is NodesChanged signal on 215 Cros device during the context. 216 217 """ 218 try: 219 audio_facade.start_counting_signal('NodesChanged') 220 yield 221 finally: 222 count = audio_facade.stop_counting_signal() 223 if count: 224 message = 'Got %d unexpected NodesChanged signal' % count 225 logging.error(message) 226 if callback: 227 callback() 228 raise error.TestFail(message) 229 230 231# The second dominant frequency should have energy less than -26dB of the 232# first dominant frequency in the spectrum. 233DEFAULT_SECOND_PEAK_RATIO = 0.05 234 235# Tolerate more for bluetooth audio using HSP. 236HSP_SECOND_PEAK_RATIO = 0.2 237 238# The deviation of estimated dominant frequency from golden frequency. 239DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5 240 241def check_recorded_frequency( 242 golden_file, recorder, 243 second_peak_ratio=DEFAULT_SECOND_PEAK_RATIO, 244 frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD, 245 ignore_frequencies=None, check_anomaly=False): 246 """Checks if the recorded data contains sine tone of golden frequency. 247 248 @param golden_file: An AudioTestData object that serves as golden data. 249 @param recorder: An AudioWidget used in the test to record data. 250 @param second_peak_ratio: The test fails when the second dominant 251 frequency has coefficient larger than this 252 ratio of the coefficient of first dominant 253 frequency. 254 @param frequency_diff_threshold: The maximum difference between estimated 255 frequency of test signal and golden 256 frequency. This value should be small for 257 signal passed through line. 258 @param ignore_frequencies: A list of frequencies to be ignored. The 259 component in the spectral with frequency too 260 close to the frequency in the list will be 261 ignored. The comparison of frequencies uses 262 frequency_diff_threshold as well. 263 @param check_anomaly: True to check anomaly in the signal. 264 265 @raises error.TestFail if the recorded data does not contain sine tone of 266 golden frequency. 267 268 """ 269 data_format = recorder.data_format 270 recorded_data = audio_data.AudioRawData( 271 binary=recorder.get_binary(), 272 channel=data_format['channel'], 273 sample_format=data_format['sample_format']) 274 275 errors = [] 276 277 for test_channel, golden_channel in enumerate(recorder.channel_map): 278 if golden_channel is None: 279 logging.info('Skipped channel %d', test_channel) 280 continue 281 282 signal = recorded_data.channel_data[test_channel] 283 saturate_value = audio_data.get_maximum_value_from_sample_format( 284 data_format['sample_format']) 285 normalized_signal = audio_analysis.normalize_signal( 286 signal, saturate_value) 287 spectral = audio_analysis.spectral_analysis( 288 normalized_signal, data_format['rate']) 289 290 if not spectral: 291 errors.append( 292 'Channel %d: Can not find dominant frequency.' % 293 test_channel) 294 295 golden_frequency = golden_file.frequencies[golden_channel] 296 logging.debug('Checking channel %s spectral %s against frequency %s', 297 test_channel, spectral, golden_frequency) 298 299 dominant_frequency = spectral[0][0] 300 301 if (abs(dominant_frequency - golden_frequency) > 302 frequency_diff_threshold): 303 errors.append( 304 'Channel %d: Dominant frequency %s is away from golden %s' % 305 (test_channel, dominant_frequency, golden_frequency)) 306 307 if check_anomaly: 308 detected_anomaly = audio_analysis.anomaly_detection( 309 signal=normalized_signal, 310 rate=data_format['rate'], 311 freq=golden_frequency) 312 if detected_anomaly: 313 errors.append( 314 'Channel %d: Detect anomaly near these time: %s' % 315 (test_channel, detected_anomaly)) 316 else: 317 logging.info( 318 'Channel %d: Quality is good as there is no anomaly', 319 test_channel) 320 321 322 def should_be_ignored(frequency): 323 """Checks if frequency is close to any frequency in ignore list. 324 325 @param frequency: The frequency to be tested. 326 327 @returns: True if the frequency should be ignored. False otherwise. 328 329 """ 330 for ignore_frequency in ignore_frequencies: 331 if (abs(frequency - ignore_frequency) < 332 frequency_diff_threshold): 333 logging.debug('Ignore frequency: %s', frequency) 334 return True 335 336 # Filter out the frequencies to be ignored. 337 if ignore_frequencies: 338 spectral = [x for x in spectral if not should_be_ignored(x[0])] 339 340 if len(spectral) > 1: 341 first_coeff = spectral[0][1] 342 second_coeff = spectral[1][1] 343 if second_coeff > first_coeff * second_peak_ratio: 344 errors.append( 345 'Channel %d: Found large second dominant frequencies: ' 346 '%s' % (test_channel, spectral)) 347 348 if errors: 349 raise error.TestFail(', '.join(errors)) 350