1#!/usr/bin/env python3
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17# Note: This test has been labelled as an integration test due to its use of
18# real data, and the five to six second execution time.
19import logging
20import numpy
21import os
22import unittest
23
24# TODO(markdr): Remove this after soundfile is added to setup.py
25import sys
26import mock
27sys.modules['soundfile'] = mock.Mock()
28
29import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis
30import acts.test_utils.audio_analysis_lib.audio_data as audio_data
31
32
33class SpectralAnalysisTest(unittest.TestCase):
34    def setUp(self):
35        """Uses the same seed to generate noise for each test."""
36        numpy.random.seed(0)
37
38    def dummy_peak_detection(self, array, window_size):
39        """Detects peaks in an array in simple way.
40
41        A point (i, array[i]) is a peak if array[i] is the maximum among
42        array[i - half_window_size] to array[i + half_window_size].
43        If array[i - half_window_size] to array[i + half_window_size] are all
44        equal, then there is no peak in this window.
45
46        Args:
47            array: The input array to detect peaks in. Array is a list of
48                absolute values of the magnitude of transformed coefficient.
49            window_size: The window to detect peaks.
50
51        Returns:
52            A list of tuples:
53                [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2),
54                ...]
55                where the tuples are sorted by peak values.
56
57        """
58        half_window_size = window_size / 2
59        length = len(array)
60
61        def mid_is_peak(array, mid, left, right):
62            """Checks if value at mid is the largest among left to right.
63
64            Args:
65                array: A list of numbers.
66                mid: The mid index.
67                left: The left index.
68                rigth: The right index.
69
70            Returns:
71                True if array[index] is the maximum among numbers in array
72                    between index [left, right] inclusively.
73
74            """
75            value_mid = array[int(mid)]
76            for index in range(int(left), int(right) + 1):
77                if index == mid:
78                    continue
79                if array[index] >= value_mid:
80                    return False
81            return True
82
83        results = []
84        for mid in range(length):
85            left = max(0, mid - half_window_size)
86            right = min(length - 1, mid + half_window_size)
87            if mid_is_peak(array, mid, left, right):
88                results.append((mid, array[int(mid)]))
89
90        # Sort the peaks by values.
91        return sorted(results, key=lambda x: x[1], reverse=True)
92
93    def test_peak_detection(self):
94        array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1]
95        result = audio_analysis.peak_detection(array, 4)
96        golden_answer = [(12, 5), (4, 4)]
97        self.assertEqual(result, golden_answer)
98
99    def test_peak_detection_large(self):
100        array = numpy.random.uniform(0, 1, 1000000)
101        window_size = 100
102        logging.debug('Test large array using dummy peak detection')
103        dummy_answer = self.dummy_peak_detection(array, window_size)
104        logging.debug('Test large array using improved peak detection')
105        improved_answer = audio_analysis.peak_detection(array, window_size)
106        logging.debug('Compare the result')
107        self.assertEqual(dummy_answer, improved_answer)
108
109    def test_spectral_analysis(self):
110        rate = 48000
111        length_in_secs = 0.5
112        freq_1 = 490.0
113        freq_2 = 60.0
114        coeff_1 = 1
115        coeff_2 = 0.3
116        samples = length_in_secs * rate
117        noise = numpy.random.standard_normal(int(samples)) * 0.005
118        x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples)
119        y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 *
120             numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise
121        results = audio_analysis.spectral_analysis(y, rate)
122        # Results should contains
123        # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant
124        # frequency with coefficient 1, 60Hz is the second dominant frequency
125        # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient
126        # around 0.1. The k constant is resulted from window function.
127        logging.debug('Results: %s', results)
128        self.assertTrue(abs(results[0][0] - freq_1) < 1)
129        self.assertTrue(abs(results[1][0] - freq_2) < 1)
130        self.assertTrue(
131            abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01)
132
133    def test_spectral_snalysis_real_data(self):
134        """This unittest checks the spectral analysis works on real data."""
135        file_path = os.path.join(
136            os.path.dirname(__file__), 'test_data', '1k_2k.raw')
137        binary = open(file_path, 'rb').read()
138        data = audio_data.AudioRawData(binary, 2, 'S32_LE')
139        saturate_value = audio_data.get_maximum_value_from_sample_format(
140            'S32_LE')
141        golden_frequency = [1000, 2000]
142        for channel in [0, 1]:
143            normalized_signal = audio_analysis.normalize_signal(
144                data.channel_data[channel], saturate_value)
145            spectral = audio_analysis.spectral_analysis(normalized_signal,
146                                                        48000, 0.02)
147            logging.debug('channel %s: %s', channel, spectral)
148            self.assertTrue(
149                abs(spectral[0][0] - golden_frequency[channel]) < 5,
150                'Dominant frequency is not correct')
151
152    def test_not_meaningful_data(self):
153        """Checks that sepectral analysis handles un-meaningful data."""
154        rate = 48000
155        length_in_secs = 0.5
156        samples = length_in_secs * rate
157        noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5
158        noise = numpy.random.standard_normal(int(samples)) * noise_amplitude
159        results = audio_analysis.spectral_analysis(noise, rate)
160        self.assertEqual([(0, 0)], results)
161
162    def testEmptyData(self):
163        """Checks that sepectral analysis rejects empty data."""
164        with self.assertRaises(audio_analysis.EmptyDataError):
165            results = audio_analysis.spectral_analysis([], 100)
166
167
168class NormalizeTest(unittest.TestCase):
169    def test_normalize(self):
170        y = [1, 2, 3, 4, 5]
171        normalized_y = audio_analysis.normalize_signal(y, 10)
172        expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5])
173        for i in range(len(y)):
174            self.assertEqual(expected[i], normalized_y[i])
175
176
177class AnomalyTest(unittest.TestCase):
178    def setUp(self):
179        """Creates a test signal of sine wave."""
180        # Use the same seed for each test case.
181        numpy.random.seed(0)
182
183        self.block_size = 120
184        self.rate = 48000
185        self.freq = 440
186        length_in_secs = 0.25
187        self.samples = length_in_secs * self.rate
188        x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate,
189                           self.samples)
190        self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x)
191
192    def add_noise(self):
193        """Add noise to the test signal."""
194        noise_amplitude = 0.3
195        noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude
196        self.y = self.y + noise
197
198    def insert_anomaly(self):
199        """Inserts an anomaly to the test signal.
200
201        The anomaly self.anomaly_samples should be created before calling this
202        method.
203
204        """
205        self.anomaly_start_secs = 0.1
206        self.y = numpy.insert(self.y,
207                              int(self.anomaly_start_secs * self.rate),
208                              self.anomaly_samples)
209
210    def generate_skip_anomaly(self):
211        """Skips a section of test signal."""
212        self.anomaly_start_secs = 0.1
213        self.anomaly_duration_secs = 0.005
214        anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs
215        anomaly_start_index = self.anomaly_start_secs * self.rate
216        anomaly_append_index = anomaly_append_secs * self.rate
217        self.y = numpy.append(self.y[:int(anomaly_start_index)],
218                              self.y[int(anomaly_append_index):])
219
220    def create_constant_anomaly(self, amplitude):
221        """Creates an anomaly of constant samples.
222
223        Args:
224            amplitude: The amplitude of the constant samples.
225
226        """
227        self.anomaly_duration_secs = 0.005
228        self.anomaly_samples = ([amplitude] *
229                                int(self.anomaly_duration_secs * self.rate))
230
231    def run_analysis(self):
232        """Runs the anomaly detection."""
233        self.results = audio_analysis.anomaly_detection(
234            self.y, self.rate, self.freq, self.block_size)
235        logging.debug('Results: %s', self.results)
236
237    def check_no_anomaly(self):
238        """Verifies that there is no anomaly in detection result."""
239        self.run_analysis()
240        self.assertFalse(self.results)
241
242    def check_anomaly(self):
243        """Verifies that there is anomaly in detection result.
244
245        The detection result should contain anomaly time stamps that are
246        close to where anomaly was inserted. There can be multiple anomalies
247        since the detection depends on the block size.
248
249        """
250        self.run_analysis()
251        self.assertTrue(self.results)
252        # Anomaly can be detected as long as the detection window of block size
253        # overlaps with anomaly.
254        expected_detected_range_secs = (
255            self.anomaly_start_secs - float(self.block_size) / self.rate,
256            self.anomaly_start_secs + self.anomaly_duration_secs)
257        for detected_secs in self.results:
258            self.assertTrue(detected_secs <= expected_detected_range_secs[1])
259            self.assertTrue(detected_secs >= expected_detected_range_secs[0])
260
261    def test_good_signal(self):
262        """Sine wave signal with no noise or anomaly."""
263        self.check_no_anomaly()
264
265    def test_good_signal_noise(self):
266        """Sine wave signal with noise."""
267        self.add_noise()
268        self.check_no_anomaly()
269
270    def test_zero_anomaly(self):
271        """Sine wave signal with no noise but with anomaly.
272
273        This test case simulates underrun in digital data where there will be
274        one block of samples with 0 amplitude.
275
276        """
277        self.create_constant_anomaly(0)
278        self.insert_anomaly()
279        self.check_anomaly()
280
281    def test_zero_anomaly_noise(self):
282        """Sine wave signal with noise and anomaly.
283
284        This test case simulates underrun in analog data where there will be
285        one block of samples with amplitudes close to 0.
286
287        """
288        self.create_constant_anomaly(0)
289        self.insert_anomaly()
290        self.add_noise()
291        self.check_anomaly()
292
293    def test_low_constant_anomaly(self):
294        """Sine wave signal with low constant anomaly.
295
296        The anomaly is one block of constant values.
297
298        """
299        self.create_constant_anomaly(0.05)
300        self.insert_anomaly()
301        self.check_anomaly()
302
303    def test_low_constant_anomaly_noise(self):
304        """Sine wave signal with low constant anomaly and noise.
305
306        The anomaly is one block of constant values.
307
308        """
309        self.create_constant_anomaly(0.05)
310        self.insert_anomaly()
311        self.add_noise()
312        self.check_anomaly()
313
314    def test_high_constant_anomaly(self):
315        """Sine wave signal with high constant anomaly.
316
317        The anomaly is one block of constant values.
318
319        """
320        self.create_constant_anomaly(2)
321        self.insert_anomaly()
322        self.check_anomaly()
323
324    def test_high_constant_anomaly_noise(self):
325        """Sine wave signal with high constant anomaly and noise.
326
327        The anomaly is one block of constant values.
328
329        """
330        self.create_constant_anomaly(2)
331        self.insert_anomaly()
332        self.add_noise()
333        self.check_anomaly()
334
335    def test_skipped_anomaly(self):
336        """Sine wave signal with skipped anomaly.
337
338        The anomaly simulates the symptom where a block is skipped.
339
340        """
341        self.generate_skip_anomaly()
342        self.check_anomaly()
343
344    def test_skipped_anomaly_noise(self):
345        """Sine wave signal with skipped anomaly with noise.
346
347        The anomaly simulates the symptom where a block is skipped.
348
349        """
350        self.generate_skip_anomaly()
351        self.add_noise()
352        self.check_anomaly()
353
354    def test_empty_data(self):
355        """Checks that anomaly detection rejects empty data."""
356        self.y = []
357        with self.assertRaises(audio_analysis.EmptyDataError):
358            self.check_anomaly()
359
360
361if __name__ == '__main__':
362    logging.basicConfig(
363        level=logging.DEBUG,
364        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
365    unittest.main()
366