1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import subprocess 8import tempfile 9 10from autotest_lib.client.bin import utils 11from autotest_lib.client.common_lib import error 12from autotest_lib.client.cros.audio import audio_helper 13from autotest_lib.client.cros.audio import cmd_utils 14from autotest_lib.client.cros.audio import cras_utils 15from autotest_lib.client.cros.audio import sox_utils 16 17 18_TEST_TONE_ONE = 440 19_TEST_TONE_TWO = 523 20 21class audio_CRASFormatConversion(audio_helper.cras_rms_test): 22 """Checks that sample rate conversion works in CRAS.""" 23 24 version = 1 25 26 27 def play_sine_tone(self, frequency, rate): 28 """Plays a sine tone by cras and returns the processes. 29 30 @param frequency: the frequency of the sine wave. 31 @param rate: the sampling rate. 32 """ 33 p1 = cmd_utils.popen( 34 sox_utils.generate_sine_tone_cmd( 35 filename='-', rate=rate, frequencies=frequency, gain=-6), 36 stdout=subprocess.PIPE) 37 p2 = cras_utils.playback(blocking=False, 38 stdin=p1.stdout, 39 playback_file='-', 40 rate=rate) 41 return [p1, p2] 42 43 44 def wait_for_active_stream_count(self, expected_count): 45 """Waits until the number of active streams matches the requested 46 number or until a timeout occurs. 47 48 @param expected_count: the exact number of streams required to 49 be active for execution to continue. 50 51 @raise TestError: if a timeout occurs. 52 """ 53 54 utils.poll_for_condition( 55 lambda: cras_utils.get_active_stream_count() == expected_count, 56 exception=error.TestError( 57 'Timeout waiting active stream count to become %d' % 58 expected_count), 59 timeout=1, sleep_interval=0.05) 60 61 def loopback(self, noise_profile, primary, secondary): 62 """Gets the rms value of the recorded audio of playing two different 63 tones (the 440 and 523 Hz sine wave) at the specified sampling rate. 64 65 @param noise_profile: The noise profile which is used to reduce the 66 noise of the recored audio. 67 @param primary: The first sample rate, HW will be set to this. 68 @param secondary: The second sample rate, will be SRC'd to the first. 69 """ 70 popens = [] 71 72 record_file = os.path.join(self.resultsdir, 73 'record-%s-%s.wav' % (primary, secondary)) 74 75 # There should be no other active streams. 76 self.wait_for_active_stream_count(0) 77 78 # Start with the primary sample rate, then add the secondary. This 79 # causes the secondary to be SRC'd to the primary rate. 80 try: 81 # Play the first audio stream and make sure it has been played 82 popens += self.play_sine_tone(_TEST_TONE_ONE, primary) 83 self.wait_for_active_stream_count(1) 84 85 # Play the second audio stream and make sure it has been played 86 popens += self.play_sine_tone(_TEST_TONE_TWO, secondary) 87 self.wait_for_active_stream_count(2) 88 89 cras_utils.capture(record_file, duration=1, rate=44100) 90 91 # Make sure the playback is still in good shape 92 if any(p.poll() is not None for p in popens): 93 # We will log more details later in finally. 94 raise error.TestFail('process unexpectly stopped') 95 96 reduced_file = tempfile.NamedTemporaryFile() 97 sox_utils.noise_reduce( 98 record_file, reduced_file.name, noise_profile, rate=44100) 99 100 sox_stat = sox_utils.get_stat(reduced_file.name, rate=44100) 101 102 logging.info('The sox stat of (%d, %d) is %s', 103 primary, secondary, str(sox_stat)) 104 105 return sox_stat.rms 106 107 finally: 108 cmd_utils.kill_or_log_returncode(*popens) 109 110 def run_once(self, test_sample_rates): 111 """Runs the format conversion test. 112 """ 113 114 rms_values = {} 115 116 # Record silence to use as the noise profile. 117 noise_file = os.path.join(self.resultsdir, "noise.wav") 118 noise_profile = tempfile.NamedTemporaryFile() 119 cras_utils.capture(noise_file, duration=1) 120 sox_utils.noise_profile(noise_file, noise_profile.name) 121 122 # Try all sample rate pairs. 123 for primary in test_sample_rates: 124 for secondary in test_sample_rates: 125 key = 'rms_value_%d_%d' % (primary, secondary) 126 rms_values[key] = self.loopback( 127 noise_profile.name, primary, secondary) 128 129 # Record at all sample rates 130 record_file = tempfile.NamedTemporaryFile() 131 for rate in test_sample_rates: 132 cras_utils.capture(record_file.name, duration=1, rate=rate) 133 134 # Add min_rms_value to the result 135 rms_values['min_rms_value'] = min(rms_values.values()) 136 137 self.write_perf_keyval(rms_values) 138