1#!/usr/bin/env python3 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# Note: This test has been labelled as an integration test due to its use of 18# real data, and the five to six second execution time. 19import logging 20import numpy 21import os 22import unittest 23 24# TODO(markdr): Remove this after soundfile is added to setup.py 25import sys 26import mock 27sys.modules['soundfile'] = mock.Mock() 28 29import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis 30import acts.test_utils.audio_analysis_lib.audio_data as audio_data 31 32 33class SpectralAnalysisTest(unittest.TestCase): 34 def setUp(self): 35 """Uses the same seed to generate noise for each test.""" 36 numpy.random.seed(0) 37 38 def dummy_peak_detection(self, array, window_size): 39 """Detects peaks in an array in simple way. 40 41 A point (i, array[i]) is a peak if array[i] is the maximum among 42 array[i - half_window_size] to array[i + half_window_size]. 43 If array[i - half_window_size] to array[i + half_window_size] are all 44 equal, then there is no peak in this window. 45 46 Args: 47 array: The input array to detect peaks in. Array is a list of 48 absolute values of the magnitude of transformed coefficient. 49 window_size: The window to detect peaks. 50 51 Returns: 52 A list of tuples: 53 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), 54 ...] 55 where the tuples are sorted by peak values. 56 57 """ 58 half_window_size = window_size / 2 59 length = len(array) 60 61 def mid_is_peak(array, mid, left, right): 62 """Checks if value at mid is the largest among left to right. 63 64 Args: 65 array: A list of numbers. 66 mid: The mid index. 67 left: The left index. 68 rigth: The right index. 69 70 Returns: 71 True if array[index] is the maximum among numbers in array 72 between index [left, right] inclusively. 73 74 """ 75 value_mid = array[int(mid)] 76 for index in range(int(left), int(right) + 1): 77 if index == mid: 78 continue 79 if array[index] >= value_mid: 80 return False 81 return True 82 83 results = [] 84 for mid in range(length): 85 left = max(0, mid - half_window_size) 86 right = min(length - 1, mid + half_window_size) 87 if mid_is_peak(array, mid, left, right): 88 results.append((mid, array[int(mid)])) 89 90 # Sort the peaks by values. 91 return sorted(results, key=lambda x: x[1], reverse=True) 92 93 def test_peak_detection(self): 94 array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] 95 result = audio_analysis.peak_detection(array, 4) 96 golden_answer = [(12, 5), (4, 4)] 97 self.assertEqual(result, golden_answer) 98 99 def test_peak_detection_large(self): 100 array = numpy.random.uniform(0, 1, 1000000) 101 window_size = 100 102 logging.debug('Test large array using dummy peak detection') 103 dummy_answer = self.dummy_peak_detection(array, window_size) 104 logging.debug('Test large array using improved peak detection') 105 improved_answer = audio_analysis.peak_detection(array, window_size) 106 logging.debug('Compare the result') 107 self.assertEqual(dummy_answer, improved_answer) 108 109 def test_spectral_analysis(self): 110 rate = 48000 111 length_in_secs = 0.5 112 freq_1 = 490.0 113 freq_2 = 60.0 114 coeff_1 = 1 115 coeff_2 = 0.3 116 samples = length_in_secs * rate 117 noise = numpy.random.standard_normal(int(samples)) * 0.005 118 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) 119 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 * 120 numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise 121 results = audio_analysis.spectral_analysis(y, rate) 122 # Results should contains 123 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant 124 # frequency with coefficient 1, 60Hz is the second dominant frequency 125 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient 126 # around 0.1. The k constant is resulted from window function. 127 logging.debug('Results: %s', results) 128 self.assertTrue(abs(results[0][0] - freq_1) < 1) 129 self.assertTrue(abs(results[1][0] - freq_2) < 1) 130 self.assertTrue( 131 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) 132 133 def test_spectral_snalysis_real_data(self): 134 """This unittest checks the spectral analysis works on real data.""" 135 file_path = os.path.join( 136 os.path.dirname(__file__), 'test_data', '1k_2k.raw') 137 binary = open(file_path, 'rb').read() 138 data = audio_data.AudioRawData(binary, 2, 'S32_LE') 139 saturate_value = audio_data.get_maximum_value_from_sample_format( 140 'S32_LE') 141 golden_frequency = [1000, 2000] 142 for channel in [0, 1]: 143 normalized_signal = audio_analysis.normalize_signal( 144 data.channel_data[channel], saturate_value) 145 spectral = audio_analysis.spectral_analysis(normalized_signal, 146 48000, 0.02) 147 logging.debug('channel %s: %s', channel, spectral) 148 self.assertTrue( 149 abs(spectral[0][0] - golden_frequency[channel]) < 5, 150 'Dominant frequency is not correct') 151 152 def test_not_meaningful_data(self): 153 """Checks that sepectral analysis handles un-meaningful data.""" 154 rate = 48000 155 length_in_secs = 0.5 156 samples = length_in_secs * rate 157 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 158 noise = numpy.random.standard_normal(int(samples)) * noise_amplitude 159 results = audio_analysis.spectral_analysis(noise, rate) 160 self.assertEqual([(0, 0)], results) 161 162 def testEmptyData(self): 163 """Checks that sepectral analysis rejects empty data.""" 164 with self.assertRaises(audio_analysis.EmptyDataError): 165 results = audio_analysis.spectral_analysis([], 100) 166 167 168class NormalizeTest(unittest.TestCase): 169 def test_normalize(self): 170 y = [1, 2, 3, 4, 5] 171 normalized_y = audio_analysis.normalize_signal(y, 10) 172 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) 173 for i in range(len(y)): 174 self.assertEqual(expected[i], normalized_y[i]) 175 176 177class AnomalyTest(unittest.TestCase): 178 def setUp(self): 179 """Creates a test signal of sine wave.""" 180 # Use the same seed for each test case. 181 numpy.random.seed(0) 182 183 self.block_size = 120 184 self.rate = 48000 185 self.freq = 440 186 length_in_secs = 0.25 187 self.samples = length_in_secs * self.rate 188 x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate, 189 self.samples) 190 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) 191 192 def add_noise(self): 193 """Add noise to the test signal.""" 194 noise_amplitude = 0.3 195 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude 196 self.y = self.y + noise 197 198 def insert_anomaly(self): 199 """Inserts an anomaly to the test signal. 200 201 The anomaly self.anomaly_samples should be created before calling this 202 method. 203 204 """ 205 self.anomaly_start_secs = 0.1 206 self.y = numpy.insert(self.y, 207 int(self.anomaly_start_secs * self.rate), 208 self.anomaly_samples) 209 210 def generate_skip_anomaly(self): 211 """Skips a section of test signal.""" 212 self.anomaly_start_secs = 0.1 213 self.anomaly_duration_secs = 0.005 214 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs 215 anomaly_start_index = self.anomaly_start_secs * self.rate 216 anomaly_append_index = anomaly_append_secs * self.rate 217 self.y = numpy.append(self.y[:int(anomaly_start_index)], 218 self.y[int(anomaly_append_index):]) 219 220 def create_constant_anomaly(self, amplitude): 221 """Creates an anomaly of constant samples. 222 223 Args: 224 amplitude: The amplitude of the constant samples. 225 226 """ 227 self.anomaly_duration_secs = 0.005 228 self.anomaly_samples = ([amplitude] * 229 int(self.anomaly_duration_secs * self.rate)) 230 231 def run_analysis(self): 232 """Runs the anomaly detection.""" 233 self.results = audio_analysis.anomaly_detection( 234 self.y, self.rate, self.freq, self.block_size) 235 logging.debug('Results: %s', self.results) 236 237 def check_no_anomaly(self): 238 """Verifies that there is no anomaly in detection result.""" 239 self.run_analysis() 240 self.assertFalse(self.results) 241 242 def check_anomaly(self): 243 """Verifies that there is anomaly in detection result. 244 245 The detection result should contain anomaly time stamps that are 246 close to where anomaly was inserted. There can be multiple anomalies 247 since the detection depends on the block size. 248 249 """ 250 self.run_analysis() 251 self.assertTrue(self.results) 252 # Anomaly can be detected as long as the detection window of block size 253 # overlaps with anomaly. 254 expected_detected_range_secs = ( 255 self.anomaly_start_secs - float(self.block_size) / self.rate, 256 self.anomaly_start_secs + self.anomaly_duration_secs) 257 for detected_secs in self.results: 258 self.assertTrue(detected_secs <= expected_detected_range_secs[1]) 259 self.assertTrue(detected_secs >= expected_detected_range_secs[0]) 260 261 def test_good_signal(self): 262 """Sine wave signal with no noise or anomaly.""" 263 self.check_no_anomaly() 264 265 def test_good_signal_noise(self): 266 """Sine wave signal with noise.""" 267 self.add_noise() 268 self.check_no_anomaly() 269 270 def test_zero_anomaly(self): 271 """Sine wave signal with no noise but with anomaly. 272 273 This test case simulates underrun in digital data where there will be 274 one block of samples with 0 amplitude. 275 276 """ 277 self.create_constant_anomaly(0) 278 self.insert_anomaly() 279 self.check_anomaly() 280 281 def test_zero_anomaly_noise(self): 282 """Sine wave signal with noise and anomaly. 283 284 This test case simulates underrun in analog data where there will be 285 one block of samples with amplitudes close to 0. 286 287 """ 288 self.create_constant_anomaly(0) 289 self.insert_anomaly() 290 self.add_noise() 291 self.check_anomaly() 292 293 def test_low_constant_anomaly(self): 294 """Sine wave signal with low constant anomaly. 295 296 The anomaly is one block of constant values. 297 298 """ 299 self.create_constant_anomaly(0.05) 300 self.insert_anomaly() 301 self.check_anomaly() 302 303 def test_low_constant_anomaly_noise(self): 304 """Sine wave signal with low constant anomaly and noise. 305 306 The anomaly is one block of constant values. 307 308 """ 309 self.create_constant_anomaly(0.05) 310 self.insert_anomaly() 311 self.add_noise() 312 self.check_anomaly() 313 314 def test_high_constant_anomaly(self): 315 """Sine wave signal with high constant anomaly. 316 317 The anomaly is one block of constant values. 318 319 """ 320 self.create_constant_anomaly(2) 321 self.insert_anomaly() 322 self.check_anomaly() 323 324 def test_high_constant_anomaly_noise(self): 325 """Sine wave signal with high constant anomaly and noise. 326 327 The anomaly is one block of constant values. 328 329 """ 330 self.create_constant_anomaly(2) 331 self.insert_anomaly() 332 self.add_noise() 333 self.check_anomaly() 334 335 def test_skipped_anomaly(self): 336 """Sine wave signal with skipped anomaly. 337 338 The anomaly simulates the symptom where a block is skipped. 339 340 """ 341 self.generate_skip_anomaly() 342 self.check_anomaly() 343 344 def test_skipped_anomaly_noise(self): 345 """Sine wave signal with skipped anomaly with noise. 346 347 The anomaly simulates the symptom where a block is skipped. 348 349 """ 350 self.generate_skip_anomaly() 351 self.add_noise() 352 self.check_anomaly() 353 354 def test_empty_data(self): 355 """Checks that anomaly detection rejects empty data.""" 356 self.y = [] 357 with self.assertRaises(audio_analysis.EmptyDataError): 358 self.check_anomaly() 359 360 361if __name__ == '__main__': 362 logging.basicConfig( 363 level=logging.DEBUG, 364 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 365 unittest.main() 366