1#!/usr/bin/python
2import logging
3import numpy
4import os
5import unittest
6
7import common
8from autotest_lib.client.cros.audio import audio_analysis
9from autotest_lib.client.cros.audio import audio_data
10
11class SpectralAnalysisTest(unittest.TestCase):
12    def setUp(self):
13        """Uses the same seed to generate noise for each test."""
14        numpy.random.seed(0)
15
16
17    def dummy_peak_detection(self, array, window_size):
18        """Detects peaks in an array in simple way.
19
20        A point (i, array[i]) is a peak if array[i] is the maximum among
21        array[i - half_window_size] to array[i + half_window_size].
22        If array[i - half_window_size] to array[i + half_window_size] are all
23        equal, then there is no peak in this window.
24
25        @param window_size: The window to detect peaks.
26
27        @returns: A list of tuples:
28                  [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
29                   ...]
30                  where the tuples are sorted by peak values.
31
32        """
33        half_window_size = window_size / 2
34        length = len(array)
35
36        def mid_is_peak(array, mid, left, right):
37            """Checks if value at mid is the largest among left to right.
38
39            @param array: A list of numbers.
40            @param mid: The mid index.
41            @param left: The left index.
42            @param rigth: The right index.
43
44            @returns: True if array[index] is the maximum among numbers in array
45                      between index [left, right] inclusively.
46
47            """
48            value_mid = array[mid]
49            for index in xrange(left, right + 1):
50                if index == mid:
51                    continue
52                if array[index] >= value_mid:
53                    return False
54            return True
55
56        results = []
57        for mid in xrange(length):
58            left = max(0, mid - half_window_size)
59            right = min(length - 1, mid + half_window_size)
60            if mid_is_peak(array, mid, left, right):
61                results.append((mid, array[mid]))
62
63        # Sort the peaks by values.
64        return sorted(results, key=lambda x: x[1], reverse=True)
65
66
67    def testPeakDetection(self):
68        array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
69        result = audio_analysis.peak_detection(array, 4)
70        golden_answer = [(12, 5), (4, 4)]
71        self.assertEqual(result, golden_answer)
72
73
74    def testPeakDetectionLarge(self):
75        array = numpy.random.uniform(0, 1, 1000000)
76        window_size = 100
77        logging.debug('Test large array using dummy peak detection')
78        dummy_answer = self.dummy_peak_detection(array, window_size)
79        logging.debug('Test large array using improved peak detection')
80        improved_answer = audio_analysis.peak_detection(array, window_size)
81        logging.debug('Compare the result')
82        self.assertEqual(dummy_answer, improved_answer)
83
84
85    def testSpectralAnalysis(self):
86        rate = 48000
87        length_in_secs = 0.5
88        freq_1 = 490.0
89        freq_2 = 60.0
90        coeff_1 = 1
91        coeff_2 = 0.3
92        samples = length_in_secs * rate
93        noise = numpy.random.standard_normal(samples) * 0.005
94        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
95        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) +
96             coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
97        results = audio_analysis.spectral_analysis(y, rate)
98        # Results should contains
99        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
100        # frequency with coefficient 1, 60Hz is the second dominant frequency
101        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
102        # around 0.1. The k constant is resulted from window function.
103        logging.debug('Results: %s', results)
104        self.assertTrue(abs(results[0][0]-freq_1) < 1)
105        self.assertTrue(abs(results[1][0]-freq_2) < 1)
106        self.assertTrue(
107                abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
108
109
110    def testSpectralAnalysisRealData(self):
111        """This unittest checks the spectral analysis works on real data."""
112        file_path = os.path.join(
113                os.path.dirname(__file__), 'test_data', '1k_2k.raw')
114        binary = open(file_path, 'r').read()
115        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
116        saturate_value = audio_data.get_maximum_value_from_sample_format(
117                'S32_LE')
118        golden_frequency = [1000, 2000]
119        for channel in [0, 1]:
120            normalized_signal = audio_analysis.normalize_signal(
121                    data.channel_data[channel],saturate_value)
122            spectral = audio_analysis.spectral_analysis(
123                    normalized_signal, 48000, 0.02)
124            logging.debug('channel %s: %s', channel, spectral)
125            self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5,
126                            'Dominant frequency is not correct')
127
128
129    def testNotMeaningfulData(self):
130        """Checks that sepectral analysis handles un-meaningful data."""
131        rate = 48000
132        length_in_secs = 0.5
133        samples = length_in_secs * rate
134        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
135        noise = numpy.random.standard_normal(samples) * noise_amplitude
136        results = audio_analysis.spectral_analysis(noise, rate)
137        self.assertEqual([(0, 0)], results)
138
139
140    def testEmptyData(self):
141        """Checks that sepectral analysis rejects empty data."""
142        with self.assertRaises(audio_analysis.EmptyDataError):
143            results = audio_analysis.spectral_analysis([], 100)
144
145
146class NormalizeTest(unittest.TestCase):
147    def testNormalize(self):
148        y = [1, 2, 3, 4, 5]
149        normalized_y = audio_analysis.normalize_signal(y, 10)
150        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
151        for i in xrange(len(y)):
152            self.assertEqual(expected[i], normalized_y[i])
153
154
155class AnomalyTest(unittest.TestCase):
156    def setUp(self):
157        """Creates a test signal of sine wave."""
158        # Use the same seed for each test case.
159        numpy.random.seed(0)
160
161        self.block_size = 120
162        self.rate = 48000
163        self.freq = 440
164        length_in_secs = 0.25
165        self.samples = length_in_secs * self.rate
166        x = numpy.linspace(
167                0.0, (self.samples - 1) * 1.0 / self.rate, self.samples)
168        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
169
170
171    def add_noise(self):
172        """Add noise to the test signal."""
173        noise_amplitude = 0.3
174        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
175        self.y = self.y + noise
176
177
178    def insert_anomaly(self):
179        """Inserts an anomaly to the test signal.
180
181        The anomaly self.anomaly_samples should be created before calling this
182        method.
183
184        """
185        self.anomaly_start_secs = 0.1
186        self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate),
187                              self.anomaly_samples)
188
189
190    def generate_skip_anomaly(self):
191        """Skips a section of test signal."""
192        self.anomaly_start_secs = 0.1
193        self.anomaly_duration_secs = 0.005
194        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
195        anomaly_start_index = self.anomaly_start_secs * self.rate
196        anomaly_append_index = anomaly_append_secs * self.rate
197        self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:])
198
199
200    def create_constant_anomaly(self, amplitude):
201        """Creates an anomaly of constant samples.
202
203        @param amplitude: The amplitude of the constant samples.
204
205        """
206        self.anomaly_duration_secs = 0.005
207        self.anomaly_samples = (
208                [amplitude] * int(self.anomaly_duration_secs * self.rate))
209
210
211    def run_analysis(self):
212        """Runs the anomaly detection."""
213        self.results = audio_analysis.anomaly_detection(
214                self.y, self.rate, self.freq, self.block_size)
215        logging.debug('Results: %s', self.results)
216
217
218    def check_no_anomaly(self):
219        """Verifies that there is no anomaly in detection result."""
220        self.run_analysis()
221        self.assertFalse(self.results)
222
223
224    def check_anomaly(self):
225        """Verifies that there is anomaly in detection result.
226
227        The detection result should contain anomaly time stamps that are
228        close to where anomaly was inserted. There can be multiple anomalies
229        since the detection depends on the block size.
230
231        """
232        self.run_analysis()
233        self.assertTrue(self.results)
234        # Anomaly can be detected as long as the detection window of block size
235        # overlaps with anomaly.
236        expected_detected_range_secs = (
237                self.anomaly_start_secs - float(self.block_size) / self.rate,
238                self.anomaly_start_secs + self.anomaly_duration_secs)
239        for detected_secs in self.results:
240            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
241            self.assertTrue(detected_secs >= expected_detected_range_secs[0] )
242
243
244    def testGoodSignal(self):
245        """Sine wave signal with no noise or anomaly."""
246        self.check_no_anomaly()
247
248
249    def testGoodSignalNoise(self):
250        """Sine wave signal with noise."""
251        self.add_noise()
252        self.check_no_anomaly()
253
254
255    def testZeroAnomaly(self):
256        """Sine wave signal with no noise but with anomaly.
257
258        This test case simulates underrun in digital data where there will be
259        one block of samples with 0 amplitude.
260
261        """
262        self.create_constant_anomaly(0)
263        self.insert_anomaly()
264        self.check_anomaly()
265
266
267    def testZeroAnomalyNoise(self):
268        """Sine wave signal with noise and anomaly.
269
270        This test case simulates underrun in analog data where there will be
271        one block of samples with amplitudes close to 0.
272
273        """
274        self.create_constant_anomaly(0)
275        self.insert_anomaly()
276        self.add_noise()
277        self.check_anomaly()
278
279
280    def testLowConstantAnomaly(self):
281        """Sine wave signal with low constant anomaly.
282
283        The anomaly is one block of constant values.
284
285        """
286        self.create_constant_anomaly(0.05)
287        self.insert_anomaly()
288        self.check_anomaly()
289
290
291    def testLowConstantAnomalyNoise(self):
292        """Sine wave signal with low constant anomaly and noise.
293
294        The anomaly is one block of constant values.
295
296        """
297        self.create_constant_anomaly(0.05)
298        self.insert_anomaly()
299        self.add_noise()
300        self.check_anomaly()
301
302
303    def testHighConstantAnomaly(self):
304        """Sine wave signal with high constant anomaly.
305
306        The anomaly is one block of constant values.
307
308        """
309        self.create_constant_anomaly(2)
310        self.insert_anomaly()
311        self.check_anomaly()
312
313
314    def testHighConstantAnomalyNoise(self):
315        """Sine wave signal with high constant anomaly and noise.
316
317        The anomaly is one block of constant values.
318
319        """
320        self.create_constant_anomaly(2)
321        self.insert_anomaly()
322        self.add_noise()
323        self.check_anomaly()
324
325
326    def testSkippedAnomaly(self):
327        """Sine wave signal with skipped anomaly.
328
329        The anomaly simulates the symptom where a block is skipped.
330
331        """
332        self.generate_skip_anomaly()
333        self.check_anomaly()
334
335
336    def testSkippedAnomalyNoise(self):
337        """Sine wave signal with skipped anomaly with noise.
338
339        The anomaly simulates the symptom where a block is skipped.
340
341        """
342        self.generate_skip_anomaly()
343        self.add_noise()
344        self.check_anomaly()
345
346
347    def testEmptyData(self):
348        """Checks that anomaly detection rejects empty data."""
349        self.y = []
350        with self.assertRaises(audio_analysis.EmptyDataError):
351            self.check_anomaly()
352
353
354if __name__ == '__main__':
355    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
356    unittest.main()
357