1#!/usr/bin/env python3
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18import numpy
19import os
20import unittest
21
22import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis
23import acts.test_utils.audio_analysis_lib.audio_data as audio_data
24
25
26class SpectralAnalysisTest(unittest.TestCase):
27    def setUp(self):
28        """Uses the same seed to generate noise for each test."""
29        numpy.random.seed(0)
30
31    def dummy_peak_detection(self, array, window_size):
32        """Detects peaks in an array in simple way.
33
34        A point (i, array[i]) is a peak if array[i] is the maximum among
35        array[i - half_window_size] to array[i + half_window_size].
36        If array[i - half_window_size] to array[i + half_window_size] are all
37        equal, then there is no peak in this window.
38
39        Args:
40            array: The input array to detect peaks in. Array is a list of
41                absolute values of the magnitude of transformed coefficient.
42            window_size: The window to detect peaks.
43
44        Returns:
45            A list of tuples:
46                [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
47                ...]
48                where the tuples are sorted by peak values.
49
50        """
51        half_window_size = window_size / 2
52        length = len(array)
53
54        def mid_is_peak(array, mid, left, right):
55            """Checks if value at mid is the largest among left to right.
56
57            Args:
58                array: A list of numbers.
59                mid: The mid index.
60                left: The left index.
61                rigth: The right index.
62
63            Returns:
64                True if array[index] is the maximum among numbers in array
65                    between index [left, right] inclusively.
66
67            """
68            value_mid = array[int(mid)]
69            for index in range(int(left), int(right) + 1):
70                if index == mid:
71                    continue
72                if array[index] >= value_mid:
73                    return False
74            return True
75
76        results = []
77        for mid in range(length):
78            left = max(0, mid - half_window_size)
79            right = min(length - 1, mid + half_window_size)
80            if mid_is_peak(array, mid, left, right):
81                results.append((mid, array[int(mid)]))
82
83        # Sort the peaks by values.
84        return sorted(results, key=lambda x: x[1], reverse=True)
85
86    def testPeakDetection(self):
87        array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
88        result = audio_analysis.peak_detection(array, 4)
89        golden_answer = [(12, 5), (4, 4)]
90        self.assertEqual(result, golden_answer)
91
92    def testPeakDetectionLarge(self):
93        array = numpy.random.uniform(0, 1, 1000000)
94        window_size = 100
95        logging.debug('Test large array using dummy peak detection')
96        dummy_answer = self.dummy_peak_detection(array, window_size)
97        logging.debug('Test large array using improved peak detection')
98        improved_answer = audio_analysis.peak_detection(array, window_size)
99        logging.debug('Compare the result')
100        self.assertEqual(dummy_answer, improved_answer)
101
102    def testSpectralAnalysis(self):
103        rate = 48000
104        length_in_secs = 0.5
105        freq_1 = 490.0
106        freq_2 = 60.0
107        coeff_1 = 1
108        coeff_2 = 0.3
109        samples = length_in_secs * rate
110        noise = numpy.random.standard_normal(int(samples)) * 0.005
111        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
112        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 *
113             numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
114        results = audio_analysis.spectral_analysis(y, rate)
115        # Results should contains
116        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
117        # frequency with coefficient 1, 60Hz is the second dominant frequency
118        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
119        # around 0.1. The k constant is resulted from window function.
120        logging.debug('Results: %s', results)
121        self.assertTrue(abs(results[0][0] - freq_1) < 1)
122        self.assertTrue(abs(results[1][0] - freq_2) < 1)
123        self.assertTrue(
124            abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
125
126    def testSpectralAnalysisRealData(self):
127        """This unittest checks the spectral analysis works on real data."""
128        file_path = os.path.join(
129            os.path.dirname(__file__), 'test_data', '1k_2k.raw')
130        binary = open(file_path, 'rb').read()
131        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
132        saturate_value = audio_data.get_maximum_value_from_sample_format(
133            'S32_LE')
134        golden_frequency = [1000, 2000]
135        for channel in [0, 1]:
136            normalized_signal = audio_analysis.normalize_signal(
137                data.channel_data[channel], saturate_value)
138            spectral = audio_analysis.spectral_analysis(normalized_signal,
139                                                        48000, 0.02)
140            logging.debug('channel %s: %s', channel, spectral)
141            self.assertTrue(
142                abs(spectral[0][0] - golden_frequency[channel]) < 5,
143                'Dominant frequency is not correct')
144
145    def testNotMeaningfulData(self):
146        """Checks that sepectral analysis handles un-meaningful data."""
147        rate = 48000
148        length_in_secs = 0.5
149        samples = length_in_secs * rate
150        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
151        noise = numpy.random.standard_normal(int(samples)) * noise_amplitude
152        results = audio_analysis.spectral_analysis(noise, rate)
153        self.assertEqual([(0, 0)], results)
154
155    def testEmptyData(self):
156        """Checks that sepectral analysis rejects empty data."""
157        with self.assertRaises(audio_analysis.EmptyDataError):
158            results = audio_analysis.spectral_analysis([], 100)
159
160
161class NormalizeTest(unittest.TestCase):
162    def testNormalize(self):
163        y = [1, 2, 3, 4, 5]
164        normalized_y = audio_analysis.normalize_signal(y, 10)
165        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
166        for i in range(len(y)):
167            self.assertEqual(expected[i], normalized_y[i])
168
169
170class AnomalyTest(unittest.TestCase):
171    def setUp(self):
172        """Creates a test signal of sine wave."""
173        # Use the same seed for each test case.
174        numpy.random.seed(0)
175
176        self.block_size = 120
177        self.rate = 48000
178        self.freq = 440
179        length_in_secs = 0.25
180        self.samples = length_in_secs * self.rate
181        x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate,
182                           self.samples)
183        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
184
185    def add_noise(self):
186        """Add noise to the test signal."""
187        noise_amplitude = 0.3
188        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
189        self.y = self.y + noise
190
191    def insert_anomaly(self):
192        """Inserts an anomaly to the test signal.
193
194        The anomaly self.anomaly_samples should be created before calling this
195        method.
196
197        """
198        self.anomaly_start_secs = 0.1
199        self.y = numpy.insert(self.y,
200                              int(self.anomaly_start_secs * self.rate),
201                              self.anomaly_samples)
202
203    def generate_skip_anomaly(self):
204        """Skips a section of test signal."""
205        self.anomaly_start_secs = 0.1
206        self.anomaly_duration_secs = 0.005
207        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
208        anomaly_start_index = self.anomaly_start_secs * self.rate
209        anomaly_append_index = anomaly_append_secs * self.rate
210        self.y = numpy.append(self.y[:int(anomaly_start_index)],
211                              self.y[int(anomaly_append_index):])
212
213    def create_constant_anomaly(self, amplitude):
214        """Creates an anomaly of constant samples.
215
216        Args:
217            amplitude: The amplitude of the constant samples.
218
219        """
220        self.anomaly_duration_secs = 0.005
221        self.anomaly_samples = ([amplitude] *
222                                int(self.anomaly_duration_secs * self.rate))
223
224    def run_analysis(self):
225        """Runs the anomaly detection."""
226        self.results = audio_analysis.anomaly_detection(
227            self.y, self.rate, self.freq, self.block_size)
228        logging.debug('Results: %s', self.results)
229
230    def check_no_anomaly(self):
231        """Verifies that there is no anomaly in detection result."""
232        self.run_analysis()
233        self.assertFalse(self.results)
234
235    def check_anomaly(self):
236        """Verifies that there is anomaly in detection result.
237
238        The detection result should contain anomaly time stamps that are
239        close to where anomaly was inserted. There can be multiple anomalies
240        since the detection depends on the block size.
241
242        """
243        self.run_analysis()
244        self.assertTrue(self.results)
245        # Anomaly can be detected as long as the detection window of block size
246        # overlaps with anomaly.
247        expected_detected_range_secs = (
248            self.anomaly_start_secs - float(self.block_size) / self.rate,
249            self.anomaly_start_secs + self.anomaly_duration_secs)
250        for detected_secs in self.results:
251            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
252            self.assertTrue(detected_secs >= expected_detected_range_secs[0])
253
254    def testGoodSignal(self):
255        """Sine wave signal with no noise or anomaly."""
256        self.check_no_anomaly()
257
258    def testGoodSignalNoise(self):
259        """Sine wave signal with noise."""
260        self.add_noise()
261        self.check_no_anomaly()
262
263    def testZeroAnomaly(self):
264        """Sine wave signal with no noise but with anomaly.
265
266        This test case simulates underrun in digital data where there will be
267        one block of samples with 0 amplitude.
268
269        """
270        self.create_constant_anomaly(0)
271        self.insert_anomaly()
272        self.check_anomaly()
273
274    def testZeroAnomalyNoise(self):
275        """Sine wave signal with noise and anomaly.
276
277        This test case simulates underrun in analog data where there will be
278        one block of samples with amplitudes close to 0.
279
280        """
281        self.create_constant_anomaly(0)
282        self.insert_anomaly()
283        self.add_noise()
284        self.check_anomaly()
285
286    def testLowConstantAnomaly(self):
287        """Sine wave signal with low constant anomaly.
288
289        The anomaly is one block of constant values.
290
291        """
292        self.create_constant_anomaly(0.05)
293        self.insert_anomaly()
294        self.check_anomaly()
295
296    def testLowConstantAnomalyNoise(self):
297        """Sine wave signal with low constant anomaly and noise.
298
299        The anomaly is one block of constant values.
300
301        """
302        self.create_constant_anomaly(0.05)
303        self.insert_anomaly()
304        self.add_noise()
305        self.check_anomaly()
306
307    def testHighConstantAnomaly(self):
308        """Sine wave signal with high constant anomaly.
309
310        The anomaly is one block of constant values.
311
312        """
313        self.create_constant_anomaly(2)
314        self.insert_anomaly()
315        self.check_anomaly()
316
317    def testHighConstantAnomalyNoise(self):
318        """Sine wave signal with high constant anomaly and noise.
319
320        The anomaly is one block of constant values.
321
322        """
323        self.create_constant_anomaly(2)
324        self.insert_anomaly()
325        self.add_noise()
326        self.check_anomaly()
327
328    def testSkippedAnomaly(self):
329        """Sine wave signal with skipped anomaly.
330
331        The anomaly simulates the symptom where a block is skipped.
332
333        """
334        self.generate_skip_anomaly()
335        self.check_anomaly()
336
337    def testSkippedAnomalyNoise(self):
338        """Sine wave signal with skipped anomaly with noise.
339
340        The anomaly simulates the symptom where a block is skipped.
341
342        """
343        self.generate_skip_anomaly()
344        self.add_noise()
345        self.check_anomaly()
346
347    def testEmptyData(self):
348        """Checks that anomaly detection rejects empty data."""
349        self.y = []
350        with self.assertRaises(audio_analysis.EmptyDataError):
351            self.check_anomaly()
352
353
354if __name__ == '__main__':
355    logging.basicConfig(
356        level=logging.DEBUG,
357        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
358    unittest.main()
359