1#!/usr/bin/env python3 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18import numpy 19import os 20import unittest 21 22import acts.test_utils.audio_analysis_lib.audio_analysis as audio_analysis 23import acts.test_utils.audio_analysis_lib.audio_data as audio_data 24 25 26class SpectralAnalysisTest(unittest.TestCase): 27 def setUp(self): 28 """Uses the same seed to generate noise for each test.""" 29 numpy.random.seed(0) 30 31 def dummy_peak_detection(self, array, window_size): 32 """Detects peaks in an array in simple way. 33 34 A point (i, array[i]) is a peak if array[i] is the maximum among 35 array[i - half_window_size] to array[i + half_window_size]. 36 If array[i - half_window_size] to array[i + half_window_size] are all 37 equal, then there is no peak in this window. 38 39 Args: 40 array: The input array to detect peaks in. Array is a list of 41 absolute values of the magnitude of transformed coefficient. 42 window_size: The window to detect peaks. 43 44 Returns: 45 A list of tuples: 46 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), 47 ...] 48 where the tuples are sorted by peak values. 49 50 """ 51 half_window_size = window_size / 2 52 length = len(array) 53 54 def mid_is_peak(array, mid, left, right): 55 """Checks if value at mid is the largest among left to right. 56 57 Args: 58 array: A list of numbers. 59 mid: The mid index. 60 left: The left index. 61 rigth: The right index. 62 63 Returns: 64 True if array[index] is the maximum among numbers in array 65 between index [left, right] inclusively. 66 67 """ 68 value_mid = array[int(mid)] 69 for index in range(int(left), int(right) + 1): 70 if index == mid: 71 continue 72 if array[index] >= value_mid: 73 return False 74 return True 75 76 results = [] 77 for mid in range(length): 78 left = max(0, mid - half_window_size) 79 right = min(length - 1, mid + half_window_size) 80 if mid_is_peak(array, mid, left, right): 81 results.append((mid, array[int(mid)])) 82 83 # Sort the peaks by values. 84 return sorted(results, key=lambda x: x[1], reverse=True) 85 86 def testPeakDetection(self): 87 array = [0, 1, 2, 3, 4, 3, 2, 1, 0, 1, 2, 3, 5, 3, 2, 1, 1, 1, 1, 1] 88 result = audio_analysis.peak_detection(array, 4) 89 golden_answer = [(12, 5), (4, 4)] 90 self.assertEqual(result, golden_answer) 91 92 def testPeakDetectionLarge(self): 93 array = numpy.random.uniform(0, 1, 1000000) 94 window_size = 100 95 logging.debug('Test large array using dummy peak detection') 96 dummy_answer = self.dummy_peak_detection(array, window_size) 97 logging.debug('Test large array using improved peak detection') 98 improved_answer = audio_analysis.peak_detection(array, window_size) 99 logging.debug('Compare the result') 100 self.assertEqual(dummy_answer, improved_answer) 101 102 def testSpectralAnalysis(self): 103 rate = 48000 104 length_in_secs = 0.5 105 freq_1 = 490.0 106 freq_2 = 60.0 107 coeff_1 = 1 108 coeff_2 = 0.3 109 samples = length_in_secs * rate 110 noise = numpy.random.standard_normal(int(samples)) * 0.005 111 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) 112 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + coeff_2 * 113 numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise 114 results = audio_analysis.spectral_analysis(y, rate) 115 # Results should contains 116 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant 117 # frequency with coefficient 1, 60Hz is the second dominant frequency 118 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient 119 # around 0.1. The k constant is resulted from window function. 120 logging.debug('Results: %s', results) 121 self.assertTrue(abs(results[0][0] - freq_1) < 1) 122 self.assertTrue(abs(results[1][0] - freq_2) < 1) 123 self.assertTrue( 124 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) 125 126 def testSpectralAnalysisRealData(self): 127 """This unittest checks the spectral analysis works on real data.""" 128 file_path = os.path.join( 129 os.path.dirname(__file__), 'test_data', '1k_2k.raw') 130 binary = open(file_path, 'rb').read() 131 data = audio_data.AudioRawData(binary, 2, 'S32_LE') 132 saturate_value = audio_data.get_maximum_value_from_sample_format( 133 'S32_LE') 134 golden_frequency = [1000, 2000] 135 for channel in [0, 1]: 136 normalized_signal = audio_analysis.normalize_signal( 137 data.channel_data[channel], saturate_value) 138 spectral = audio_analysis.spectral_analysis(normalized_signal, 139 48000, 0.02) 140 logging.debug('channel %s: %s', channel, spectral) 141 self.assertTrue( 142 abs(spectral[0][0] - golden_frequency[channel]) < 5, 143 'Dominant frequency is not correct') 144 145 def testNotMeaningfulData(self): 146 """Checks that sepectral analysis handles un-meaningful data.""" 147 rate = 48000 148 length_in_secs = 0.5 149 samples = length_in_secs * rate 150 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 151 noise = numpy.random.standard_normal(int(samples)) * noise_amplitude 152 results = audio_analysis.spectral_analysis(noise, rate) 153 self.assertEqual([(0, 0)], results) 154 155 def testEmptyData(self): 156 """Checks that sepectral analysis rejects empty data.""" 157 with self.assertRaises(audio_analysis.EmptyDataError): 158 results = audio_analysis.spectral_analysis([], 100) 159 160 161class NormalizeTest(unittest.TestCase): 162 def testNormalize(self): 163 y = [1, 2, 3, 4, 5] 164 normalized_y = audio_analysis.normalize_signal(y, 10) 165 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) 166 for i in range(len(y)): 167 self.assertEqual(expected[i], normalized_y[i]) 168 169 170class AnomalyTest(unittest.TestCase): 171 def setUp(self): 172 """Creates a test signal of sine wave.""" 173 # Use the same seed for each test case. 174 numpy.random.seed(0) 175 176 self.block_size = 120 177 self.rate = 48000 178 self.freq = 440 179 length_in_secs = 0.25 180 self.samples = length_in_secs * self.rate 181 x = numpy.linspace(0.0, (self.samples - 1) * 1.0 / self.rate, 182 self.samples) 183 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) 184 185 def add_noise(self): 186 """Add noise to the test signal.""" 187 noise_amplitude = 0.3 188 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude 189 self.y = self.y + noise 190 191 def insert_anomaly(self): 192 """Inserts an anomaly to the test signal. 193 194 The anomaly self.anomaly_samples should be created before calling this 195 method. 196 197 """ 198 self.anomaly_start_secs = 0.1 199 self.y = numpy.insert(self.y, 200 int(self.anomaly_start_secs * self.rate), 201 self.anomaly_samples) 202 203 def generate_skip_anomaly(self): 204 """Skips a section of test signal.""" 205 self.anomaly_start_secs = 0.1 206 self.anomaly_duration_secs = 0.005 207 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs 208 anomaly_start_index = self.anomaly_start_secs * self.rate 209 anomaly_append_index = anomaly_append_secs * self.rate 210 self.y = numpy.append(self.y[:int(anomaly_start_index)], 211 self.y[int(anomaly_append_index):]) 212 213 def create_constant_anomaly(self, amplitude): 214 """Creates an anomaly of constant samples. 215 216 Args: 217 amplitude: The amplitude of the constant samples. 218 219 """ 220 self.anomaly_duration_secs = 0.005 221 self.anomaly_samples = ([amplitude] * 222 int(self.anomaly_duration_secs * self.rate)) 223 224 def run_analysis(self): 225 """Runs the anomaly detection.""" 226 self.results = audio_analysis.anomaly_detection( 227 self.y, self.rate, self.freq, self.block_size) 228 logging.debug('Results: %s', self.results) 229 230 def check_no_anomaly(self): 231 """Verifies that there is no anomaly in detection result.""" 232 self.run_analysis() 233 self.assertFalse(self.results) 234 235 def check_anomaly(self): 236 """Verifies that there is anomaly in detection result. 237 238 The detection result should contain anomaly time stamps that are 239 close to where anomaly was inserted. There can be multiple anomalies 240 since the detection depends on the block size. 241 242 """ 243 self.run_analysis() 244 self.assertTrue(self.results) 245 # Anomaly can be detected as long as the detection window of block size 246 # overlaps with anomaly. 247 expected_detected_range_secs = ( 248 self.anomaly_start_secs - float(self.block_size) / self.rate, 249 self.anomaly_start_secs + self.anomaly_duration_secs) 250 for detected_secs in self.results: 251 self.assertTrue(detected_secs <= expected_detected_range_secs[1]) 252 self.assertTrue(detected_secs >= expected_detected_range_secs[0]) 253 254 def testGoodSignal(self): 255 """Sine wave signal with no noise or anomaly.""" 256 self.check_no_anomaly() 257 258 def testGoodSignalNoise(self): 259 """Sine wave signal with noise.""" 260 self.add_noise() 261 self.check_no_anomaly() 262 263 def testZeroAnomaly(self): 264 """Sine wave signal with no noise but with anomaly. 265 266 This test case simulates underrun in digital data where there will be 267 one block of samples with 0 amplitude. 268 269 """ 270 self.create_constant_anomaly(0) 271 self.insert_anomaly() 272 self.check_anomaly() 273 274 def testZeroAnomalyNoise(self): 275 """Sine wave signal with noise and anomaly. 276 277 This test case simulates underrun in analog data where there will be 278 one block of samples with amplitudes close to 0. 279 280 """ 281 self.create_constant_anomaly(0) 282 self.insert_anomaly() 283 self.add_noise() 284 self.check_anomaly() 285 286 def testLowConstantAnomaly(self): 287 """Sine wave signal with low constant anomaly. 288 289 The anomaly is one block of constant values. 290 291 """ 292 self.create_constant_anomaly(0.05) 293 self.insert_anomaly() 294 self.check_anomaly() 295 296 def testLowConstantAnomalyNoise(self): 297 """Sine wave signal with low constant anomaly and noise. 298 299 The anomaly is one block of constant values. 300 301 """ 302 self.create_constant_anomaly(0.05) 303 self.insert_anomaly() 304 self.add_noise() 305 self.check_anomaly() 306 307 def testHighConstantAnomaly(self): 308 """Sine wave signal with high constant anomaly. 309 310 The anomaly is one block of constant values. 311 312 """ 313 self.create_constant_anomaly(2) 314 self.insert_anomaly() 315 self.check_anomaly() 316 317 def testHighConstantAnomalyNoise(self): 318 """Sine wave signal with high constant anomaly and noise. 319 320 The anomaly is one block of constant values. 321 322 """ 323 self.create_constant_anomaly(2) 324 self.insert_anomaly() 325 self.add_noise() 326 self.check_anomaly() 327 328 def testSkippedAnomaly(self): 329 """Sine wave signal with skipped anomaly. 330 331 The anomaly simulates the symptom where a block is skipped. 332 333 """ 334 self.generate_skip_anomaly() 335 self.check_anomaly() 336 337 def testSkippedAnomalyNoise(self): 338 """Sine wave signal with skipped anomaly with noise. 339 340 The anomaly simulates the symptom where a block is skipped. 341 342 """ 343 self.generate_skip_anomaly() 344 self.add_noise() 345 self.check_anomaly() 346 347 def testEmptyData(self): 348 """Checks that anomaly detection rejects empty data.""" 349 self.y = [] 350 with self.assertRaises(audio_analysis.EmptyDataError): 351 self.check_anomaly() 352 353 354if __name__ == '__main__': 355 logging.basicConfig( 356 level=logging.DEBUG, 357 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') 358 unittest.main() 359