1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6 7import logging 8import numpy 9import os 10import re 11import tempfile 12import threading 13import time 14 15from glob import glob 16from autotest_lib.client.bin import test, utils 17from autotest_lib.client.bin.input.input_device import * 18from autotest_lib.client.common_lib import error 19from autotest_lib.client.cros.audio import alsa_utils 20from autotest_lib.client.cros.audio import audio_data 21from autotest_lib.client.cros.audio import cmd_utils 22from autotest_lib.client.cros.audio import cras_utils 23from autotest_lib.client.cros.audio import sox_utils 24 25LD_LIBRARY_PATH = 'LD_LIBRARY_PATH' 26 27_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics' 28 29_DEFAULT_NUM_CHANNELS = 2 30_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat' 31_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L' 32_DEFAULT_PLAYBACK_VOLUME = 100 33_DEFAULT_CAPTURE_GAIN = 2500 34_DEFAULT_ALSA_MAX_VOLUME = '100%' 35_DEFAULT_ALSA_CAPTURE_GAIN = '25dB' 36 37# Minimum RMS value to pass when checking recorded file. 38_DEFAULT_SOX_RMS_THRESHOLD = 0.08 39 40_JACK_VALUE_ON_RE = re.compile('.*values=on') 41_HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack') 42_MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack') 43 44_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)') 45_SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)') 46 47_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected' 48_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS' 49_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS' 50 51# Tools from platform/audiotest 52AUDIOFUNTEST_PATH = 'audiofuntest' 53AUDIOLOOP_PATH = 'looptest' 54LOOPBACK_LATENCY_PATH = 'loopback_latency' 55SOX_PATH = 'sox' 56TEST_TONES_PATH = 'test_tones' 57 58_MINIMUM_NORM = 0.001 59_CORRELATION_INDEX_THRESHOLD = 0.999 60# The minimum difference of estimated frequencies between two sine waves. 61_FREQUENCY_DIFF_THRESHOLD = 20 62# The minimum RMS value of meaningful audio data. 63_MEANINGFUL_RMS_THRESHOLD = 0.001 64 65def set_mixer_controls(mixer_settings={}, card='0'): 66 """Sets all mixer controls listed in the mixer settings on card. 67 68 @param mixer_settings: Mixer settings to set. 69 @param card: Index of audio card to set mixer settings for. 70 """ 71 logging.info('Setting mixer control values on %s', card) 72 for item in mixer_settings: 73 logging.info('Setting %s to %s on card %s', 74 item['name'], item['value'], card) 75 cmd = 'amixer -c %s cset name=%s %s' 76 cmd = cmd % (card, item['name'], item['value']) 77 try: 78 utils.system(cmd) 79 except error.CmdError: 80 # A card is allowed not to support all the controls, so don't 81 # fail the test here if we get an error. 82 logging.info('amixer command failed: %s', cmd) 83 84def set_volume_levels(volume, capture): 85 """Sets the volume and capture gain through cras_test_client. 86 87 @param volume: The playback volume to set. 88 @param capture: The capture gain to set. 89 """ 90 logging.info('Setting volume level to %d', volume) 91 try: 92 utils.system('/usr/bin/cras_test_client --volume %d' % volume) 93 logging.info('Setting capture gain to %d', capture) 94 utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture) 95 utils.system('/usr/bin/cras_test_client --dump_server_info') 96 utils.system('/usr/bin/cras_test_client --mute 0') 97 utils.system('amixer -c 0 contents') 98 except error.CmdError, e: 99 raise error.TestError( 100 '*** Can not tune volume through CRAS. *** (' + str(e) + ')') 101 102def loopback_latency_check(**args): 103 """Checks loopback latency. 104 105 @param args: additional arguments for loopback_latency. 106 107 @return A tuple containing measured and reported latency in uS. 108 Return None if no audio detected. 109 """ 110 noise_threshold = str(args['n']) if args.has_key('n') else '400' 111 112 cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold) 113 114 output = utils.system_output(cmd, retain_output=True) 115 116 # Sleep for a short while to make sure device is not busy anymore 117 # after called loopback_latency. 118 time.sleep(.1) 119 120 measured_latency = None 121 reported_latency = None 122 for line in output.split('\n'): 123 match = re.search(_MEASURED_LATENCY_RE, line, re.I) 124 if match: 125 measured_latency = int(match.group(1)) 126 continue 127 match = re.search(_REPORTED_LATENCY_RE, line, re.I) 128 if match: 129 reported_latency = int(match.group(1)) 130 continue 131 if re.search(_AUDIO_NOT_FOUND_RE, line, re.I): 132 return None 133 if measured_latency and reported_latency: 134 return (measured_latency, reported_latency) 135 else: 136 # Should not reach here, just in case. 137 return None 138 139def get_mixer_jack_status(jack_reg_exp): 140 """Gets the mixer jack status. 141 142 @param jack_reg_exp: The regular expression to match jack control name. 143 144 @return None if the control does not exist, return True if jack control 145 is detected plugged, return False otherwise. 146 """ 147 output = utils.system_output('amixer -c0 controls', retain_output=True) 148 numid = None 149 for line in output.split('\n'): 150 m = jack_reg_exp.match(line) 151 if m: 152 numid = m.group(1) 153 break 154 155 # Proceed only when matched numid is not empty. 156 if numid: 157 output = utils.system_output('amixer -c0 cget numid=%s' % numid) 158 for line in output.split('\n'): 159 if _JACK_VALUE_ON_RE.match(line): 160 return True 161 return False 162 else: 163 return None 164 165def get_hp_jack_status(): 166 """Gets the status of headphone jack.""" 167 status = get_mixer_jack_status(_HP_JACK_CONTROL_RE) 168 if status is not None: 169 return status 170 171 # When headphone jack is not found in amixer, lookup input devices 172 # instead. 173 # 174 # TODO(hychao): Check hp/mic jack status dynamically from evdev. And 175 # possibly replace the existing check using amixer. 176 for evdev in glob('/dev/input/event*'): 177 device = InputDevice(evdev) 178 if device.is_hp_jack(): 179 return device.get_headphone_insert() 180 else: 181 return None 182 183def get_mic_jack_status(): 184 """Gets the status of mic jack.""" 185 status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE) 186 if status is not None: 187 return status 188 189 # When mic jack is not found in amixer, lookup input devices instead. 190 for evdev in glob('/dev/input/event*'): 191 device = InputDevice(evdev) 192 if device.is_mic_jack(): 193 return device.get_microphone_insert() 194 else: 195 return None 196 197def log_loopback_dongle_status(): 198 """Log the status of the loopback dongle to make sure it is equipped.""" 199 dongle_status_ok = True 200 201 # Check Mic Jack 202 mic_jack_status = get_mic_jack_status() 203 logging.info('Mic jack status: %s', mic_jack_status) 204 dongle_status_ok &= bool(mic_jack_status) 205 206 # Check Headphone Jack 207 hp_jack_status = get_hp_jack_status() 208 logging.info('Headphone jack status: %s', hp_jack_status) 209 dongle_status_ok &= bool(hp_jack_status) 210 211 # Use latency check to test if audio can be captured through dongle. 212 # We only want to know the basic function of dongle, so no need to 213 # assert the latency accuracy here. 214 latency = loopback_latency_check(n=4000) 215 if latency: 216 logging.info('Got latency measured %d, reported %d', 217 latency[0], latency[1]) 218 else: 219 logging.info('Latency check fail.') 220 dongle_status_ok = False 221 222 logging.info('audio loopback dongle test: %s', 223 'PASS' if dongle_status_ok else 'FAIL') 224 225# Functions to test audio palyback. 226def play_sound(duration_seconds=None, audio_file_path=None): 227 """Plays a sound file found at |audio_file_path| for |duration_seconds|. 228 229 If |audio_file_path|=None, plays a default audio file. 230 If |duration_seconds|=None, plays audio file in its entirety. 231 232 @param duration_seconds: Duration to play sound. 233 @param audio_file_path: Path to the audio file. 234 """ 235 if not audio_file_path: 236 audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav' 237 duration_arg = ('-d %d' % duration_seconds) if duration_seconds else '' 238 utils.system('aplay %s %s' % (duration_arg, audio_file_path)) 239 240def get_play_sine_args(channel, odev='default', freq=1000, duration=10, 241 sample_size=16): 242 """Gets the command args to generate a sine wav to play to odev. 243 244 @param channel: 0 for left, 1 for right; otherwize, mono. 245 @param odev: alsa output device. 246 @param freq: frequency of the generated sine tone. 247 @param duration: duration of the generated sine tone. 248 @param sample_size: output audio sample size. Default to 16. 249 """ 250 cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa', 251 odev, 'synth', str(duration)] 252 if channel == 0: 253 cmdargs += ['sine', str(freq), 'sine', '0'] 254 elif channel == 1: 255 cmdargs += ['sine', '0', 'sine', str(freq)] 256 else: 257 cmdargs += ['sine', str(freq)] 258 259 return cmdargs 260 261def play_sine(channel, odev='default', freq=1000, duration=10, 262 sample_size=16): 263 """Generates a sine wave and plays to odev. 264 265 @param channel: 0 for left, 1 for right; otherwize, mono. 266 @param odev: alsa output device. 267 @param freq: frequency of the generated sine tone. 268 @param duration: duration of the generated sine tone. 269 @param sample_size: output audio sample size. Default to 16. 270 """ 271 cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size) 272 utils.system(' '.join(cmdargs)) 273 274# Functions to compose customized sox command, execute it and process the 275# output of sox command. 276def get_sox_mixer_cmd(infile, channel, 277 num_channels=_DEFAULT_NUM_CHANNELS, 278 sox_format=_DEFAULT_SOX_FORMAT): 279 """Gets sox mixer command to reduce channel. 280 281 @param infile: Input file name. 282 @param channel: The selected channel to take effect. 283 @param num_channels: The number of total channels to test. 284 @param sox_format: Format to generate sox command. 285 """ 286 # Build up a pan value string for the sox command. 287 if channel == 0: 288 pan_values = '1' 289 else: 290 pan_values = '0' 291 for pan_index in range(1, num_channels): 292 if channel == pan_index: 293 pan_values = '%s%s' % (pan_values, ',1') 294 else: 295 pan_values = '%s%s' % (pan_values, ',0') 296 297 return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH, 298 sox_format, infile, sox_format, pan_values) 299 300def sox_stat_output(infile, channel, 301 num_channels=_DEFAULT_NUM_CHANNELS, 302 sox_format=_DEFAULT_SOX_FORMAT): 303 """Executes sox stat command. 304 305 @param infile: Input file name. 306 @param channel: The selected channel. 307 @param num_channels: The number of total channels to test. 308 @param sox_format: Format to generate sox command. 309 310 @return The output of sox stat command 311 """ 312 sox_mixer_cmd = get_sox_mixer_cmd(infile, channel, 313 num_channels, sox_format) 314 stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format) 315 sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd) 316 return utils.system_output(sox_cmd, retain_output=True) 317 318def get_audio_rms(sox_output): 319 """Gets the audio RMS value from sox stat output 320 321 @param sox_output: Output of sox stat command. 322 323 @return The RMS value parsed from sox stat output. 324 """ 325 for rms_line in sox_output.split('\n'): 326 m = _SOX_RMS_AMPLITUDE_RE.match(rms_line) 327 if m is not None: 328 return float(m.group(1)) 329 330def get_rough_freq(sox_output): 331 """Gets the rough audio frequency from sox stat output 332 333 @param sox_output: Output of sox stat command. 334 335 @return The rough frequency value parsed from sox stat output. 336 """ 337 for rms_line in sox_output.split('\n'): 338 m = _SOX_ROUGH_FREQ_RE.match(rms_line) 339 if m is not None: 340 return int(m.group(1)) 341 342def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD): 343 """Checks if the calculated RMS value is expected. 344 345 @param sox_output: The output from sox stat command. 346 @param sox_threshold: The threshold to test RMS value against. 347 348 @raises error.TestError if RMS amplitude can't be parsed. 349 @raises error.TestFail if the RMS amplitude of the recording isn't above 350 the threshold. 351 """ 352 rms_val = get_audio_rms(sox_output) 353 354 # In case we don't get a valid RMS value. 355 if rms_val is None: 356 raise error.TestError( 357 'Failed to generate an audio RMS value from playback.') 358 359 logging.info('Got audio RMS value of %f. Minimum pass is %f.', 360 rms_val, sox_threshold) 361 if rms_val < sox_threshold: 362 raise error.TestFail( 363 'Audio RMS value %f too low. Minimum pass is %f.' % 364 (rms_val, sox_threshold)) 365 366def noise_reduce_file(in_file, noise_file, out_file, 367 sox_format=_DEFAULT_SOX_FORMAT): 368 """Runs the sox command to reduce noise. 369 370 Runs the sox command to noise-reduce in_file using the noise 371 profile from noise_file. 372 373 @param in_file: The file to noise reduce. 374 @param noise_file: The file containing the noise profile. 375 This can be created by recording silence. 376 @param out_file: The file contains the noise reduced sound. 377 @param sox_format: The sox format to generate sox command. 378 """ 379 prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH, 380 sox_format, noise_file) 381 reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' % 382 (SOX_PATH, sox_format, in_file, sox_format, out_file)) 383 utils.system('%s | %s' % (prof_cmd, reduce_cmd)) 384 385def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND): 386 """Records a sample from the default input device. 387 388 @param tmpfile: The file to record to. 389 @param record_command: The command to record audio. 390 """ 391 utils.system('%s %s' % (record_command, tmpfile)) 392 393def create_wav_file(wav_dir, prefix=""): 394 """Creates a unique name for wav file. 395 396 The created file name will be preserved in autotest result directory 397 for future analysis. 398 399 @param wav_dir: The directory of created wav file. 400 @param prefix: specified file name prefix. 401 """ 402 filename = "%s-%s.wav" % (prefix, time.time()) 403 return os.path.join(wav_dir, filename) 404 405def run_in_parallel(*funs): 406 """Runs methods in parallel. 407 408 @param funs: methods to run. 409 """ 410 threads = [] 411 for f in funs: 412 t = threading.Thread(target=f) 413 t.start() 414 threads.append(t) 415 416 for t in threads: 417 t.join() 418 419def loopback_test_channels(noise_file_name, wav_dir, 420 playback_callback=None, 421 check_recorded_callback=check_audio_rms, 422 preserve_test_file=True, 423 num_channels = _DEFAULT_NUM_CHANNELS, 424 record_callback=record_sample, 425 mix_callback=None): 426 """Tests loopback on all channels. 427 428 @param noise_file_name: Name of the file contains pre-recorded noise. 429 @param wav_dir: The directory of created wav file. 430 @param playback_callback: The callback to do the playback for 431 one channel. 432 @param record_callback: The callback to do the recording. 433 @param check_recorded_callback: The callback to check recorded file. 434 @param preserve_test_file: Retain the recorded files for future debugging. 435 @param num_channels: The number of total channels to test. 436 @param mix_callback: The callback to do on the one-channel file. 437 """ 438 for channel in xrange(num_channels): 439 record_file_name = create_wav_file(wav_dir, 440 "record-%d" % channel) 441 functions = [lambda: record_callback(record_file_name)] 442 443 if playback_callback: 444 functions.append(lambda: playback_callback(channel)) 445 446 if mix_callback: 447 mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel) 448 functions.append(lambda: mix_callback(mix_file_name)) 449 450 run_in_parallel(*functions) 451 452 if mix_callback: 453 sox_output_mix = sox_stat_output(mix_file_name, channel) 454 rms_val_mix = get_audio_rms(sox_output_mix) 455 logging.info('Got mixed audio RMS value of %f.', rms_val_mix) 456 457 sox_output_record = sox_stat_output(record_file_name, channel) 458 rms_val_record = get_audio_rms(sox_output_record) 459 logging.info('Got recorded audio RMS value of %f.', rms_val_record) 460 461 reduced_file_name = create_wav_file(wav_dir, 462 "reduced-%d" % channel) 463 noise_reduce_file(record_file_name, noise_file_name, 464 reduced_file_name) 465 466 sox_output_reduced = sox_stat_output(reduced_file_name, channel) 467 468 if not preserve_test_file: 469 os.unlink(reduced_file_name) 470 os.unlink(record_file_name) 471 if mix_callback: 472 os.unlink(mix_file_name) 473 474 check_recorded_callback(sox_output_reduced) 475 476 477def get_channel_sox_stat( 478 input_audio, channel_index, channels=2, bits=16, rate=48000): 479 """Gets the sox stat info of the selected channel in the input audio file. 480 481 @param input_audio: The input audio file to be analyzed. 482 @param channel_index: The index of the channel to be analyzed. 483 (1 for the first channel). 484 @param channels: The number of channels in the input audio. 485 @param bits: The number of bits of each audio sample. 486 @param rate: The sampling rate. 487 """ 488 if channel_index <= 0 or channel_index > channels: 489 raise ValueError('incorrect channel_indexi: %d' % channel_index) 490 491 if channels == 1: 492 return sox_utils.get_stat( 493 input_audio, channels=channels, bits=bits, rate=rate) 494 495 p1 = cmd_utils.popen( 496 sox_utils.extract_channel_cmd( 497 input_audio, '-', channel_index, 498 channels=channels, bits=bits, rate=rate), 499 stdout=cmd_utils.PIPE) 500 p2 = cmd_utils.popen( 501 sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate), 502 stdin=p1.stdout, stderr=cmd_utils.PIPE) 503 stat_output = p2.stderr.read() 504 cmd_utils.wait_and_check_returncode(p1, p2) 505 return sox_utils.parse_stat_output(stat_output) 506 507 508def get_rms(input_audio, channels=1, bits=16, rate=48000): 509 """Gets the RMS values of all channels of the input audio. 510 511 @param input_audio: The input audio file to be checked. 512 @param channels: The number of channels in the input audio. 513 @param bits: The number of bits of each audio sample. 514 @param rate: The sampling rate. 515 """ 516 stats = [get_channel_sox_stat( 517 input_audio, i + 1, channels=channels, bits=bits, 518 rate=rate) for i in xrange(channels)] 519 520 logging.info('sox stat: %s', [str(s) for s in stats]) 521 return [s.rms for s in stats] 522 523 524def reduce_noise_and_get_rms( 525 input_audio, noise_file, channels=1, bits=16, rate=48000): 526 """Reduces noise in the input audio by the given noise file and then gets 527 the RMS values of all channels of the input audio. 528 529 @param input_audio: The input audio file to be analyzed. 530 @param noise_file: The noise file used to reduce noise in the input audio. 531 @param channels: The number of channels in the input audio. 532 @param bits: The number of bits of each audio sample. 533 @param rate: The sampling rate. 534 """ 535 with tempfile.NamedTemporaryFile() as reduced_file: 536 p1 = cmd_utils.popen( 537 sox_utils.noise_profile_cmd( 538 noise_file, '-', channels=channels, bits=bits, 539 rate=rate), 540 stdout=cmd_utils.PIPE) 541 p2 = cmd_utils.popen( 542 sox_utils.noise_reduce_cmd( 543 input_audio, reduced_file.name, '-', 544 channels=channels, bits=bits, rate=rate), 545 stdin=p1.stdout) 546 cmd_utils.wait_and_check_returncode(p1, p2) 547 return get_rms(reduced_file.name, channels, bits, rate) 548 549 550def skip_devices_to_test(*boards): 551 """Devices to skip due to hardware or test compatibility issues. 552 553 @param boards: the boards to skip testing. 554 """ 555 # TODO(scottz): Remove this when crbug.com/220147 is fixed. 556 dut_board = utils.get_current_board() 557 if dut_board in boards: 558 raise error.TestNAError('This test is not available on %s' % dut_board) 559 560 561def cras_rms_test_setup(): 562 """Setups for the cras_rms_tests. 563 564 To make sure the line_out-to-mic_in path is all green. 565 """ 566 # TODO(owenlin): Now, the nodes are choosed by chrome. 567 # We should do it here. 568 cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME) 569 cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME) 570 571 cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN) 572 573 cras_utils.set_system_mute(False) 574 cras_utils.set_capture_mute(False) 575 576 577def generate_rms_postmortem(): 578 """Generates postmortem for rms tests.""" 579 try: 580 logging.info('audio postmortem report') 581 log_loopback_dongle_status() 582 logging.info(get_audio_diagnostics()) 583 except Exception: 584 logging.exception('Error while generating postmortem report') 585 586 587def get_audio_diagnostics(): 588 """Gets audio diagnostic results. 589 590 @returns: a string containing diagnostic results. 591 592 """ 593 return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE) 594 595 596def get_max_cross_correlation(signal_a, signal_b): 597 """Gets max cross-correlation and best time delay of two signals. 598 599 Computes cross-correlation function between two 600 signals and gets the maximum value and time delay. 601 The steps includes: 602 1. Compute cross-correlation function of X and Y and get Cxy. 603 The correlation function Cxy is an array where Cxy[k] is the 604 cross product of X and Y when Y is delayed by k. 605 Refer to manual of numpy.correlate for detail of correlation. 606 2. Find the maximum value C_max and index C_index in Cxy. 607 3. Compute L2 norm of X and Y to get norm(X) and norm(Y). 608 4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation. 609 610 Max cross-correlation indicates the similarity of X and Y. The value 611 is 1 if X equals Y multiplied by a positive scalar. 612 The value is -1 if X equals Y multiplied by a negative scaler. 613 Any constant level shift will be regarded as distortion and will make 614 max cross-correlation value deviated from 1. 615 C_index is the best time delay of Y that make Y looks similar to X. 616 Refer to http://en.wikipedia.org/wiki/Cross-correlation. 617 618 @param signal_a: A list of numbers which contains the first signal. 619 @param signal_b: A list of numbers which contains the second signal. 620 621 @raises: ValueError if any number in signal_a or signal_b is not a float. 622 ValueError if norm of any array is less than _MINIMUM_NORM. 623 624 @returns: A tuple (correlation index, best delay). If there are more than 625 one best delay, just return the first one. 626 """ 627 def check_list_contains_float(numbers): 628 """Checks the elements in a list are all float. 629 630 @param numbers: A list of numbers. 631 632 @raises: ValueError if there is any element which is not a float 633 in the list. 634 """ 635 if any(not isinstance(x, float) for x in numbers): 636 raise ValueError('List contains number which is not a float') 637 638 check_list_contains_float(signal_a) 639 check_list_contains_float(signal_b) 640 641 norm_a = numpy.linalg.norm(signal_a) 642 norm_b = numpy.linalg.norm(signal_b) 643 logging.debug('norm_a: %f', norm_a) 644 logging.debug('norm_b: %f', norm_b) 645 if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM: 646 raise ValueError('No meaningful data as norm is too small.') 647 648 correlation = numpy.correlate(signal_a, signal_b, 'full') 649 max_correlation = max(correlation) 650 best_delays = [i for i, j in enumerate(correlation) if j == max_correlation] 651 if len(best_delays) > 1: 652 logging.warning('There are more than one best delay: %r', best_delays) 653 return max_correlation / (norm_a * norm_b), best_delays[0] 654 655 656def trim_data(data, threshold=0): 657 """Trims a data by removing value that is too small in head and tail. 658 659 Removes elements in head and tail whose absolute value is smaller than 660 or equal to threshold. 661 E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) = 662 ([0.2, 0.3, 0.2], 2) 663 664 @param data: A list of numbers. 665 @param threshold: The threshold to compare against. 666 667 @returns: A tuple (trimmed_data, end_trimmed_length), where 668 end_trimmed_length is the length of original data being trimmed 669 from the end. 670 Returns ([], None) if there is no valid data. 671 """ 672 indice_valid = [ 673 i for i, j in enumerate(data) if abs(j) > threshold] 674 if not indice_valid: 675 logging.warning( 676 'There is no element with absolute value greater ' 677 'than threshold %f', threshold) 678 return [], None 679 logging.debug('Start and end of indice_valid: %d, %d', 680 indice_valid[0], indice_valid[-1]) 681 end_trimmed_length = len(data) - indice_valid[-1] - 1 682 logging.debug('Trimmed length in the end: %d', end_trimmed_length) 683 return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length) 684 685 686def get_one_channel_correlation(test_data, golden_data): 687 """Gets max cross-correlation of test_data and golden_data. 688 689 Trims test data and compute the max cross-correlation against golden_data. 690 Signal can be trimmed because those zero values in the head and tail of 691 a signal will not affect correlation computation. 692 693 @param test_data: A list containing the data to compare against golden data. 694 @param golden_data: A list containing the golden data. 695 696 @returns: A tuple (max cross-correlation, best_delay) if data is valid. 697 Otherwise returns (None, None). Refer to docstring of 698 get_max_cross_correlation. 699 """ 700 trimmed_test_data, end_trimmed_length = trim_data(test_data) 701 702 def to_float(samples): 703 """Casts elements in the list to float. 704 705 @param samples: A list of numbers. 706 707 @returns: A list of original numbers casted to float. 708 """ 709 samples_float = [float(x) for x in samples] 710 return samples_float 711 712 max_cross_correlation, best_delay = get_max_cross_correlation( 713 to_float(golden_data), 714 to_float(trimmed_test_data)) 715 716 # The reason to add back the trimmed length in the end. 717 # E.g.: 718 # golden data: 719 # 720 # |-----------vvvv----------------| vvvv is the signal of interest. 721 # a b 722 # 723 # test data: 724 # 725 # |---x----vvvv--------x----------------| x is the place to trim. 726 # c d e f 727 # 728 # trimmed test data: 729 # 730 # |----vvvv--------| 731 # d e 732 # 733 # The first output of cross correlation computation : 734 # 735 # |-----------vvvv----------------| 736 # a b 737 # 738 # |----vvvv--------| 739 # d e 740 # 741 # The largest output of cross correlation computation happens at 742 # delay a + e. 743 # 744 # |-----------vvvv----------------| 745 # a b 746 # 747 # |----vvvv--------| 748 # d e 749 # 750 # Cross correlation starts computing by aligning the last sample 751 # of the trimmed test data to the first sample of golden data. 752 # The best delay calculated from trimmed test data and golden data 753 # cross correlation is e + a. But the real best delay that should be 754 # identical on two channel should be e + a + f. 755 # So we need to add back the length being trimmed in the end. 756 757 if max_cross_correlation: 758 return max_cross_correlation, best_delay + end_trimmed_length 759 else: 760 return None, None 761 762 763def compare_one_channel_correlation(test_data, golden_data, parameters): 764 """Compares two one-channel data by correlation. 765 766 @param test_data: A list containing the data to compare against golden data. 767 @param golden_data: A list containing the golden data. 768 @param parameters: A dict containing parameters for method. 769 770 @returns: A dict containing: 771 index: The index of similarity where 1 means they are different 772 only by a positive scale. 773 best_delay: The best delay of test data in relative to golden 774 data. 775 equal: A bool containing comparing result. 776 """ 777 if 'correlation_threshold' in parameters: 778 threshold = parameters['correlation_threshold'] 779 else: 780 threshold = _CORRELATION_INDEX_THRESHOLD 781 782 result_dict = dict() 783 max_cross_correlation, best_delay = get_one_channel_correlation( 784 test_data, golden_data) 785 result_dict['index'] = max_cross_correlation 786 result_dict['best_delay'] = best_delay 787 result_dict['equal'] = True if ( 788 max_cross_correlation and 789 max_cross_correlation > threshold) else False 790 logging.debug('result_dict: %r', result_dict) 791 return result_dict 792 793 794def compare_data_correlation(golden_data_binary, golden_data_format, 795 test_data_binary, test_data_format, 796 channel_map, parameters=None): 797 """Compares two raw data using correlation. 798 799 @param golden_data_binary: The binary containing golden data. 800 @param golden_data_format: The data format of golden data. 801 @param test_data_binary: The binary containing test data. 802 @param test_data_format: The data format of test data. 803 @param channel_map: A list containing channel mapping. 804 E.g. [1, 0, None, None, None, None, None, None] means 805 channel 0 of test data should map to channel 1 of 806 golden data. Channel 1 of test data should map to 807 channel 0 of golden data. Channel 2 to 7 of test data 808 should be skipped. 809 @param parameters: A dict containing parameters for method, if needed. 810 811 @raises: NotImplementedError if file type is not raw. 812 NotImplementedError if sampling rates of two data are not the same. 813 error.TestFail if golden data and test data are not equal. 814 """ 815 if parameters is None: 816 parameters = dict() 817 818 if (golden_data_format['file_type'] != 'raw' or 819 test_data_format['file_type'] != 'raw'): 820 raise NotImplementedError('Only support raw data in compare_data.') 821 if (golden_data_format['rate'] != test_data_format['rate']): 822 raise NotImplementedError( 823 'Only support comparing data with the same sampling rate') 824 golden_data = audio_data.AudioRawData( 825 binary=golden_data_binary, 826 channel=golden_data_format['channel'], 827 sample_format=golden_data_format['sample_format']) 828 test_data = audio_data.AudioRawData( 829 binary=test_data_binary, 830 channel=test_data_format['channel'], 831 sample_format=test_data_format['sample_format']) 832 compare_results = [] 833 for test_channel, golden_channel in enumerate(channel_map): 834 if golden_channel is None: 835 logging.info('Skipped channel %d', test_channel) 836 continue 837 test_data_one_channel = test_data.channel_data[test_channel] 838 golden_data_one_channel = golden_data.channel_data[golden_channel] 839 result_dict = dict(test_channel=test_channel, 840 golden_channel=golden_channel) 841 result_dict.update( 842 compare_one_channel_correlation( 843 test_data_one_channel, golden_data_one_channel, 844 parameters)) 845 compare_results.append(result_dict) 846 logging.info('compare_results: %r', compare_results) 847 for result in compare_results: 848 if not result['equal']: 849 error_msg = ('Failed on test channel %d and golden channel %d with ' 850 'index %f') % ( 851 result['test_channel'], 852 result['golden_channel'], 853 result['index']) 854 logging.error(error_msg) 855 raise error.TestFail(error_msg) 856 # Also checks best delay are exactly the same. 857 best_delays = set([result['best_delay'] for result in compare_results]) 858 if len(best_delays) > 1: 859 error_msg = 'There are more than one best delay: %s' % best_delays 860 logging.error(error_msg) 861 raise error.TestFail(error_msg) 862 863 864class _base_rms_test(test.test): 865 """Base class for all rms_test """ 866 867 def postprocess(self): 868 super(_base_rms_test, self).postprocess() 869 870 # Sum up the number of failed constraints in each iteration 871 if sum(len(x) for x in self.failed_constraints): 872 generate_rms_postmortem() 873 874 875class chrome_rms_test(_base_rms_test): 876 """Base test class for audio RMS test with Chrome. 877 878 The chrome instance can be accessed by self.chrome. 879 """ 880 def warmup(self): 881 skip_devices_to_test('x86-mario') 882 super(chrome_rms_test, self).warmup() 883 884 # Not all client of this file using telemetry. 885 # Just do the import here for those who really need it. 886 from autotest_lib.client.common_lib.cros import chrome 887 888 self.chrome = chrome.Chrome(init_network_controller=True) 889 890 # The audio configuration could be changed when we 891 # restart chrome. 892 try: 893 cras_rms_test_setup() 894 except Exception: 895 self.chrome.browser.Close() 896 raise 897 898 899 def cleanup(self, *args): 900 try: 901 self.chrome.close() 902 finally: 903 super(chrome_rms_test, self).cleanup() 904 905class cras_rms_test(_base_rms_test): 906 """Base test class for CRAS audio RMS test.""" 907 908 def warmup(self): 909 skip_devices_to_test('x86-mario') 910 super(cras_rms_test, self).warmup() 911 cras_rms_test_setup() 912 913 914def alsa_rms_test_setup(): 915 """Setup for alsa_rms_test. 916 917 Different boards/chipsets have different set of mixer controls. Even 918 controls that have the same name on different boards might have different 919 capabilities. The following is a general idea to setup a given class of 920 boards, and some specialized setup for certain boards. 921 """ 922 card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic') 923 arch = utils.get_arch() 924 board = utils.get_board() 925 uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090') 926 if board in ['daisy_spring', 'daisy_skate']: 927 # The MIC controls of the boards do not support dB syntax. 928 alsa_utils.mixer_cmd(card_id, 929 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) 930 alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME) 931 alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME) 932 elif arch in ['armv7l', 'aarch64'] or uses_max98090: 933 # ARM platforms or Intel platforms that uses max98090 codec driver. 934 alsa_utils.mixer_cmd(card_id, 935 'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME) 936 alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN) 937 alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN) 938 else: 939 # The rest of Intel platforms. 940 alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME) 941 alsa_utils.mixer_cmd(card_id, 942 'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN) 943 944 945class alsa_rms_test(_base_rms_test): 946 """Base test class for ALSA audio RMS test.""" 947 948 def warmup(self): 949 skip_devices_to_test('x86-mario') 950 super(alsa_rms_test, self).warmup() 951 952 alsa_rms_test_setup() 953