1#!/usr/bin/python
2import logging
3import numpy
4import unittest
5
6import common
7from autotest_lib.client.cros.audio import audio_analysis
8from autotest_lib.client.cros.audio import audio_data
9
10class SpectralAnalysisTest(unittest.TestCase):
11    def setUp(self):
12        """Uses the same seed to generate noise for each test."""
13        numpy.random.seed(0)
14
15
16    def testSpectralAnalysis(self):
17        rate = 48000
18        length_in_secs = 0.5
19        freq_1 = 490.0
20        freq_2 = 60.0
21        coeff_1 = 1
22        coeff_2 = 0.3
23        samples = length_in_secs * rate
24        noise = numpy.random.standard_normal(samples) * 0.005
25        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
26        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
27             coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
28        results = audio_analysis.spectral_analysis(y, rate)
29        # Results should contains
30        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
31        # frequency with coefficient 1, 60Hz is the second dominant frequency
32        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
33        # around 0.1. The k constant is resulted from window function.
34        logging.debug('Results: %s', results)
35        self.assertTrue(abs(results[0][0]-freq_1) < 1)
36        self.assertTrue(abs(results[1][0]-freq_2) < 1)
37        self.assertTrue(
38                abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
39
40
41    def testSpectralAnalysisRealData(self):
42        """This unittest checks the spectral analysis works on real data."""
43        binary = open('client/cros/audio/test_data/1k_2k.raw', 'r').read()
44        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
45        saturate_value = audio_data.get_maximum_value_from_sample_format(
46                'S32_LE')
47        golden_frequency = [1000, 2000]
48        for channel in [0, 1]:
49            normalized_signal = audio_analysis.normalize_signal(
50                    data.channel_data[channel],saturate_value)
51            spectral = audio_analysis.spectral_analysis(
52                    normalized_signal, 48000, 0.02)
53            logging.debug('channel %s: %s', channel, spectral)
54            self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
55                            'Dominant frequency is not correct')
56
57
58    def testNotMeaningfulData(self):
59        """Checks that sepectral analysis rejects not meaningful data."""
60        rate = 48000
61        length_in_secs = 0.5
62        samples = length_in_secs * rate
63        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
64        noise = numpy.random.standard_normal(samples) * noise_amplitude
65        with self.assertRaises(audio_analysis.RMSTooSmallError):
66            results = audio_analysis.spectral_analysis(noise, rate)
67
68
69    def testEmptyData(self):
70        """Checks that sepectral analysis rejects empty data."""
71        with self.assertRaises(audio_analysis.EmptyDataError):
72            results = audio_analysis.spectral_analysis([], 100)
73
74
75class NormalizeTest(unittest.TestCase):
76    def testNormalize(self):
77        y = [1, 2, 3, 4, 5]
78        normalized_y = audio_analysis.normalize_signal(y, 10)
79        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
80        for i in xrange(len(y)):
81            self.assertEqual(expected[i], normalized_y[i])
82
83
84class AnomalyTest(unittest.TestCase):
85    def setUp(self):
86        """Creates a test signal of sine wave."""
87        # Use the same seed for each test case.
88        numpy.random.seed(0)
89
90        self.block_size = 120
91        self.rate = 48000
92        self.freq = 440
93        length_in_secs = 0.25
94        self.samples = length_in_secs * self.rate
95        x = numpy.linspace(
96                0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
97        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
98
99
100    def add_noise(self):
101        """Add noise to the test signal."""
102        noise_amplitude = 0.3
103        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
104        self.y = self.y + noise
105
106
107    def insert_anomaly(self):
108        """Inserts an anomaly to the test signal.
109
110        The anomaly self.anomaly_samples should be created before calling this
111        method.
112
113        """
114        self.anomaly_start_secs = 0.1
115        self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
116                              self.anomaly_samples)
117
118
119    def generate_skip_anomaly(self):
120        """Skips a section of test signal."""
121        self.anomaly_start_secs = 0.1
122        self.anomaly_duration_secs = 0.005
123        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
124        anomaly_start_index = self.anomaly_start_secs * self.rate
125        anomaly_append_index = anomaly_append_secs * self.rate
126        self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])
127
128
129    def create_constant_anomaly(self, amplitude):
130        """Creates an anomaly of constant samples.
131
132        @param amplitude: The amplitude of the constant samples.
133
134        """
135        self.anomaly_duration_secs = 0.005
136        self.anomaly_samples = (
137                [amplitude] * int(self.anomaly_duration_secs * self.rate))
138
139
140    def run_analysis(self):
141        """Runs the anomaly detection."""
142        self.results = audio_analysis.anomaly_detection(
143                self.y, self.rate, self.freq, self.block_size)
144        logging.debug('Results: %s', self.results)
145
146
147    def check_no_anomaly(self):
148        """Verifies that there is no anomaly in detection result."""
149        self.run_analysis()
150        self.assertFalse(self.results)
151
152
153    def check_anomaly(self):
154        """Verifies that there is anomaly in detection result.
155
156        The detection result should contain anomaly time stamps that are
157        close to where anomaly was inserted. There can be multiple anomalies
158        since the detection depends on the block size.
159
160        """
161        self.run_analysis()
162        self.assertTrue(self.results)
163        # Anomaly can be detected as long as the detection window of block size
164        # overlaps with anomaly.
165        expected_detected_range_secs = (
166                self.anomaly_start_secs - float(self.block_size) / self.rate,
167                self.anomaly_start_secs + self.anomaly_duration_secs)
168        for detected_secs in self.results:
169            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
170            self.assertTrue(detected_secs >= expected_detected_range_secs[0] )
171
172
173    def testGoodSignal(self):
174        """Sine wave signal with no noise or anomaly."""
175        self.check_no_anomaly()
176
177
178    def testGoodSignalNoise(self):
179        """Sine wave signal with noise."""
180        self.add_noise()
181        self.check_no_anomaly()
182
183
184    def testZeroAnomaly(self):
185        """Sine wave signal with no noise but with anomaly.
186
187        This test case simulates underrun in digital data where there will be
188        one block of samples with 0 amplitude.
189
190        """
191        self.create_constant_anomaly(0)
192        self.insert_anomaly()
193        self.check_anomaly()
194
195
196    def testZeroAnomalyNoise(self):
197        """Sine wave signal with noise and anomaly.
198
199        This test case simulates underrun in analog data where there will be
200        one block of samples with amplitudes close to 0.
201
202        """
203        self.create_constant_anomaly(0)
204        self.insert_anomaly()
205        self.add_noise()
206        self.check_anomaly()
207
208
209    def testLowConstantAnomaly(self):
210        """Sine wave signal with low constant anomaly.
211
212        The anomaly is one block of constant values.
213
214        """
215        self.create_constant_anomaly(0.05)
216        self.insert_anomaly()
217        self.check_anomaly()
218
219
220    def testLowConstantAnomalyNoise(self):
221        """Sine wave signal with low constant anomaly and noise.
222
223        The anomaly is one block of constant values.
224
225        """
226        self.create_constant_anomaly(0.05)
227        self.insert_anomaly()
228        self.add_noise()
229        self.check_anomaly()
230
231
232    def testHighConstantAnomaly(self):
233        """Sine wave signal with high constant anomaly.
234
235        The anomaly is one block of constant values.
236
237        """
238        self.create_constant_anomaly(2)
239        self.insert_anomaly()
240        self.check_anomaly()
241
242
243    def testHighConstantAnomalyNoise(self):
244        """Sine wave signal with high constant anomaly and noise.
245
246        The anomaly is one block of constant values.
247
248        """
249        self.create_constant_anomaly(2)
250        self.insert_anomaly()
251        self.add_noise()
252        self.check_anomaly()
253
254
255    def testSkippedAnomaly(self):
256        """Sine wave signal with skipped anomaly.
257
258        The anomaly simulates the symptom where a block is skipped.
259
260        """
261        self.generate_skip_anomaly()
262        self.check_anomaly()
263
264
265    def testSkippedAnomalyNoise(self):
266        """Sine wave signal with skipped anomaly with noise.
267
268        The anomaly simulates the symptom where a block is skipped.
269
270        """
271        self.generate_skip_anomaly()
272        self.add_noise()
273        self.check_anomaly()
274
275
276    def testEmptyData(self):
277        """Checks that anomaly detection rejects empty data."""
278        self.y = []
279        with self.assertRaises(audio_analysis.EmptyDataError):
280            self.check_anomaly()
281
282
283if __name__ == '__main__':
284    logging.basicConfig(level=logging.DEBUG)
285    unittest.main()
286