1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import subprocess
8import tempfile
9
10from autotest_lib.client.bin import utils
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros.audio import audio_helper
13from autotest_lib.client.cros.audio import cmd_utils
14from autotest_lib.client.cros.audio import cras_utils
15from autotest_lib.client.cros.audio import sox_utils
16
17
18_TEST_TONE_ONE = 440
19_TEST_TONE_TWO = 523
20
21class audio_CRASFormatConversion(audio_helper.cras_rms_test):
22    """Checks that sample rate conversion works in CRAS."""
23
24    version = 1
25
26
27    def play_sine_tone(self, frequency, rate):
28        """Plays a sine tone by cras and returns the processes.
29
30        @param frequency: the frequency of the sine wave.
31        @param rate: the sampling rate.
32        """
33        p1 = cmd_utils.popen(
34            sox_utils.generate_sine_tone_cmd(
35                    filename='-', rate=rate, frequencies=frequency, gain=-6),
36            stdout=subprocess.PIPE)
37        p2 = cras_utils.playback(blocking=False,
38                                 stdin=p1.stdout,
39                                 playback_file='-',
40                                 rate=rate)
41        return [p1, p2]
42
43
44    def wait_for_active_stream_count(self, expected_count):
45        """Waits until the number of active streams matches the requested
46        number or until a timeout occurs.
47
48        @param expected_count: the exact number of streams required to
49                               be active for execution to continue.
50
51        @raise TestError: if a timeout occurs.
52        """
53
54        utils.poll_for_condition(
55                lambda: cras_utils.get_active_stream_count() == expected_count,
56                exception=error.TestError(
57                        'Timeout waiting active stream count to become %d' %
58                        expected_count),
59                timeout=1, sleep_interval=0.05)
60
61    def loopback(self, noise_profile, primary, secondary):
62        """Gets the rms value of the recorded audio of playing two different
63        tones (the 440 and 523 Hz sine wave) at the specified sampling rate.
64
65        @param noise_profile: The noise profile which is used to reduce the
66                              noise of the recored audio.
67        @param primary: The first sample rate, HW will be set to this.
68        @param secondary: The second sample rate, will be SRC'd to the first.
69        """
70        popens = []
71
72        record_file = os.path.join(self.resultsdir,
73                'record-%s-%s.wav' % (primary, secondary))
74
75        # There should be no other active streams.
76        self.wait_for_active_stream_count(0)
77
78        # Start with the primary sample rate, then add the secondary.  This
79        # causes the secondary to be SRC'd to the primary rate.
80        try:
81            # Play the first audio stream and make sure it has been played
82            popens += self.play_sine_tone(_TEST_TONE_ONE, primary)
83            self.wait_for_active_stream_count(1)
84
85            # Play the second audio stream and make sure it has been played
86            popens += self.play_sine_tone(_TEST_TONE_TWO, secondary)
87            self.wait_for_active_stream_count(2)
88
89            cras_utils.capture(record_file, duration=1, rate=44100)
90
91            # Make sure the playback is still in good shape
92            if any(p.poll() is not None for p in popens):
93                # We will log more details later in finally.
94                raise error.TestFail('process unexpectly stopped')
95
96            reduced_file = tempfile.NamedTemporaryFile()
97            sox_utils.noise_reduce(
98                    record_file, reduced_file.name, noise_profile, rate=44100)
99
100            sox_stat = sox_utils.get_stat(reduced_file.name, rate=44100)
101
102            logging.info('The sox stat of (%d, %d) is %s',
103                         primary, secondary, str(sox_stat))
104
105            return sox_stat.rms
106
107        finally:
108            cmd_utils.kill_or_log_returncode(*popens)
109
110    def run_once(self, test_sample_rates):
111        """Runs the format conversion test.
112        """
113
114        rms_values = {}
115
116        # Record silence to use as the noise profile.
117        noise_file = os.path.join(self.resultsdir, "noise.wav")
118        noise_profile = tempfile.NamedTemporaryFile()
119        cras_utils.capture(noise_file, duration=1)
120        sox_utils.noise_profile(noise_file, noise_profile.name)
121
122        # Try all sample rate pairs.
123        for primary in test_sample_rates:
124            for secondary in test_sample_rates:
125                key = 'rms_value_%d_%d' % (primary, secondary)
126                rms_values[key] = self.loopback(
127                        noise_profile.name, primary, secondary)
128
129        # Record at all sample rates
130        record_file = tempfile.NamedTemporaryFile()
131        for rate in test_sample_rates:
132            cras_utils.capture(record_file.name, duration=1, rate=rate)
133
134        # Add min_rms_value to the result
135        rms_values['min_rms_value'] = min(rms_values.values())
136
137        self.write_perf_keyval(rms_values)
138