1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import subprocess
8import tempfile
9
10from autotest_lib.client.bin import utils
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros.audio import audio_helper
13from autotest_lib.client.cros.audio import cmd_utils
14from autotest_lib.client.cros.audio import cras_utils
15from autotest_lib.client.cros.audio import sox_utils
16
17
18_TEST_TONE_ONE = 440
19_TEST_TONE_TWO = 523
20
21class audio_CRASFormatConversion(audio_helper.cras_rms_test):
22    """Checks that sample rate conversion works in CRAS."""
23
24    version = 1
25
26
27    def play_sine_tone(self, frequency, rate):
28        """Plays a sine tone by cras and returns the processes.
29
30        @param frequency: the frequency of the sine wave.
31        @param rate: the sampling rate.
32        """
33        p1 = cmd_utils.popen(
34            sox_utils.generate_sine_tone_cmd(
35                    filename='-', rate=rate, frequencies=frequency, gain=-6),
36            stdout=subprocess.PIPE)
37        p2 = cmd_utils.popen(
38            cras_utils.playback_cmd(playback_file='-', rate=rate),
39            stdin=p1.stdout)
40        return [p1, p2]
41
42
43    def wait_for_active_stream_count(self, expected_count):
44        """Waits until the number of active streams matches the requested
45        number or until a timeout occurs.
46
47        @param expected_count: the exact number of streams required to
48                               be active for execution to continue.
49
50        @raise TestError: if a timeout occurs.
51        """
52
53        utils.poll_for_condition(
54                lambda: cras_utils.get_active_stream_count() == expected_count,
55                exception=error.TestError(
56                        'Timeout waiting active stream count to become %d' %
57                        expected_count),
58                timeout=1, sleep_interval=0.05)
59
60    def loopback(self, noise_profile, primary, secondary):
61        """Gets the rms value of the recorded audio of playing two different
62        tones (the 440 and 523 Hz sine wave) at the specified sampling rate.
63
64        @param noise_profile: The noise profile which is used to reduce the
65                              noise of the recored audio.
66        @param primary: The first sample rate, HW will be set to this.
67        @param secondary: The second sample rate, will be SRC'd to the first.
68        """
69        popens = []
70
71        record_file = os.path.join(self.resultsdir,
72                'record-%s-%s.wav' % (primary, secondary))
73
74        # There should be no other active streams.
75        self.wait_for_active_stream_count(0)
76
77        # Start with the primary sample rate, then add the secondary.  This
78        # causes the secondary to be SRC'd to the primary rate.
79        try:
80            # Play the first audio stream and make sure it has been played
81            popens += self.play_sine_tone(_TEST_TONE_ONE, primary)
82            self.wait_for_active_stream_count(1)
83
84            # Play the second audio stream and make sure it has been played
85            popens += self.play_sine_tone(_TEST_TONE_TWO, secondary)
86            self.wait_for_active_stream_count(2)
87
88            cras_utils.capture(record_file, duration=1, rate=44100)
89
90            # Make sure the playback is still in good shape
91            if any(p.poll() is not None for p in popens):
92                # We will log more details later in finally.
93                raise error.TestFail('process unexpectly stopped')
94
95            reduced_file = tempfile.NamedTemporaryFile()
96            sox_utils.noise_reduce(
97                    record_file, reduced_file.name, noise_profile, rate=44100)
98
99            sox_stat = sox_utils.get_stat(reduced_file.name, rate=44100)
100
101            logging.info('The sox stat of (%d, %d) is %s',
102                         primary, secondary, str(sox_stat))
103
104            return sox_stat.rms
105
106        finally:
107            cmd_utils.kill_or_log_returncode(*popens)
108
109    def run_once(self, test_sample_rates):
110        """Runs the format conversion test.
111        """
112
113        rms_values = {}
114
115        # Record silence to use as the noise profile.
116        noise_file = os.path.join(self.resultsdir, "noise.wav")
117        noise_profile = tempfile.NamedTemporaryFile()
118        cras_utils.capture(noise_file, duration=1)
119        sox_utils.noise_profile(noise_file, noise_profile.name)
120
121        # Try all sample rate pairs.
122        for primary in test_sample_rates:
123            for secondary in test_sample_rates:
124                key = 'rms_value_%d_%d' % (primary, secondary)
125                rms_values[key] = self.loopback(
126                        noise_profile.name, primary, secondary)
127
128        # Record at all sample rates
129        record_file = tempfile.NamedTemporaryFile()
130        for rate in test_sample_rates:
131            cras_utils.capture(record_file.name, duration=1, rate=rate)
132
133        # Add min_rms_value to the result
134        rms_values['min_rms_value'] = min(rms_values.values())
135
136        self.write_perf_keyval(rms_values)
137