1#!/usr/bin/python 2import logging 3import numpy 4import unittest 5 6import common 7from autotest_lib.client.cros.audio import audio_analysis 8from autotest_lib.client.cros.audio import audio_data 9 10class SpectralAnalysisTest(unittest.TestCase): 11 def setUp(self): 12 """Uses the same seed to generate noise for each test.""" 13 numpy.random.seed(0) 14 15 16 def testSpectralAnalysis(self): 17 rate = 48000 18 length_in_secs = 0.5 19 freq_1 = 490.0 20 freq_2 = 60.0 21 coeff_1 = 1 22 coeff_2 = 0.3 23 samples = length_in_secs * rate 24 noise = numpy.random.standard_normal(samples) * 0.005 25 x = numpy.linspace(0.0, (samples - 1) * 1.0 / rate, samples) 26 y = (coeff_1 * numpy.sin(freq_1 * 2.0 * numpy.pi * x) + 27 coeff_2 * numpy.sin(freq_2 * 2.0 * numpy.pi * x)) + noise 28 results = audio_analysis.spectral_analysis(y, rate) 29 # Results should contains 30 # [(490, 1*k), (60, 0.3*k), (0, 0.1*k)] where 490Hz is the dominant 31 # frequency with coefficient 1, 60Hz is the second dominant frequency 32 # with coefficient 0.3, 0Hz is from Gaussian noise with coefficient 33 # around 0.1. The k constant is resulted from window function. 34 logging.debug('Results: %s', results) 35 self.assertTrue(abs(results[0][0]-freq_1) < 1) 36 self.assertTrue(abs(results[1][0]-freq_2) < 1) 37 self.assertTrue( 38 abs(results[0][1] / results[1][1] - coeff_1 / coeff_2) < 0.01) 39 40 41 def testSpectralAnalysisRealData(self): 42 """This unittest checks the spectral analysis works on real data.""" 43 binary = open('client/cros/audio/test_data/1k_2k.raw', 'r').read() 44 data = audio_data.AudioRawData(binary, 2, 'S32_LE') 45 saturate_value = audio_data.get_maximum_value_from_sample_format( 46 'S32_LE') 47 golden_frequency = [1000, 2000] 48 for channel in [0, 1]: 49 normalized_signal = audio_analysis.normalize_signal( 50 data.channel_data[channel],saturate_value) 51 spectral = audio_analysis.spectral_analysis( 52 normalized_signal, 48000, 0.02) 53 logging.debug('channel %s: %s', channel, spectral) 54 self.assertTrue(abs(spectral[0][0] - golden_frequency[channel]) < 5, 55 'Dominant frequency is not correct') 56 57 58 def testNotMeaningfulData(self): 59 """Checks that sepectral analysis rejects not meaningful data.""" 60 rate = 48000 61 length_in_secs = 0.5 62 samples = length_in_secs * rate 63 noise_amplitude = audio_analysis.MEANINGFUL_RMS_THRESHOLD * 0.5 64 noise = numpy.random.standard_normal(samples) * noise_amplitude 65 with self.assertRaises(audio_analysis.RMSTooSmallError): 66 results = audio_analysis.spectral_analysis(noise, rate) 67 68 69 def testEmptyData(self): 70 """Checks that sepectral analysis rejects empty data.""" 71 with self.assertRaises(audio_analysis.EmptyDataError): 72 results = audio_analysis.spectral_analysis([], 100) 73 74 75class NormalizeTest(unittest.TestCase): 76 def testNormalize(self): 77 y = [1, 2, 3, 4, 5] 78 normalized_y = audio_analysis.normalize_signal(y, 10) 79 expected = numpy.array([0.1, 0.2, 0.3, 0.4, 0.5]) 80 for i in xrange(len(y)): 81 self.assertEqual(expected[i], normalized_y[i]) 82 83 84class AnomalyTest(unittest.TestCase): 85 def setUp(self): 86 """Creates a test signal of sine wave.""" 87 # Use the same seed for each test case. 88 numpy.random.seed(0) 89 90 self.block_size = 120 91 self.rate = 48000 92 self.freq = 440 93 length_in_secs = 0.25 94 self.samples = length_in_secs * self.rate 95 x = numpy.linspace( 96 0.0, (self.samples - 1) * 1.0 / self.rate, self.samples) 97 self.y = numpy.sin(self.freq * 2.0 * numpy.pi * x) 98 99 100 def add_noise(self): 101 """Add noise to the test signal.""" 102 noise_amplitude = 0.3 103 noise = numpy.random.standard_normal(len(self.y)) * noise_amplitude 104 self.y = self.y + noise 105 106 107 def insert_anomaly(self): 108 """Inserts an anomaly to the test signal. 109 110 The anomaly self.anomaly_samples should be created before calling this 111 method. 112 113 """ 114 self.anomaly_start_secs = 0.1 115 self.y = numpy.insert(self.y, int(self.anomaly_start_secs * self.rate), 116 self.anomaly_samples) 117 118 119 def generate_skip_anomaly(self): 120 """Skips a section of test signal.""" 121 self.anomaly_start_secs = 0.1 122 self.anomaly_duration_secs = 0.005 123 anomaly_append_secs = self.anomaly_start_secs + self.anomaly_duration_secs 124 anomaly_start_index = self.anomaly_start_secs * self.rate 125 anomaly_append_index = anomaly_append_secs * self.rate 126 self.y = numpy.append(self.y[:anomaly_start_index], self.y[anomaly_append_index:]) 127 128 129 def create_constant_anomaly(self, amplitude): 130 """Creates an anomaly of constant samples. 131 132 @param amplitude: The amplitude of the constant samples. 133 134 """ 135 self.anomaly_duration_secs = 0.005 136 self.anomaly_samples = ( 137 [amplitude] * int(self.anomaly_duration_secs * self.rate)) 138 139 140 def run_analysis(self): 141 """Runs the anomaly detection.""" 142 self.results = audio_analysis.anomaly_detection( 143 self.y, self.rate, self.freq, self.block_size) 144 logging.debug('Results: %s', self.results) 145 146 147 def check_no_anomaly(self): 148 """Verifies that there is no anomaly in detection result.""" 149 self.run_analysis() 150 self.assertFalse(self.results) 151 152 153 def check_anomaly(self): 154 """Verifies that there is anomaly in detection result. 155 156 The detection result should contain anomaly time stamps that are 157 close to where anomaly was inserted. There can be multiple anomalies 158 since the detection depends on the block size. 159 160 """ 161 self.run_analysis() 162 self.assertTrue(self.results) 163 # Anomaly can be detected as long as the detection window of block size 164 # overlaps with anomaly. 165 expected_detected_range_secs = ( 166 self.anomaly_start_secs - float(self.block_size) / self.rate, 167 self.anomaly_start_secs + self.anomaly_duration_secs) 168 for detected_secs in self.results: 169 self.assertTrue(detected_secs <= expected_detected_range_secs[1]) 170 self.assertTrue(detected_secs >= expected_detected_range_secs[0] ) 171 172 173 def testGoodSignal(self): 174 """Sine wave signal with no noise or anomaly.""" 175 self.check_no_anomaly() 176 177 178 def testGoodSignalNoise(self): 179 """Sine wave signal with noise.""" 180 self.add_noise() 181 self.check_no_anomaly() 182 183 184 def testZeroAnomaly(self): 185 """Sine wave signal with no noise but with anomaly. 186 187 This test case simulates underrun in digital data where there will be 188 one block of samples with 0 amplitude. 189 190 """ 191 self.create_constant_anomaly(0) 192 self.insert_anomaly() 193 self.check_anomaly() 194 195 196 def testZeroAnomalyNoise(self): 197 """Sine wave signal with noise and anomaly. 198 199 This test case simulates underrun in analog data where there will be 200 one block of samples with amplitudes close to 0. 201 202 """ 203 self.create_constant_anomaly(0) 204 self.insert_anomaly() 205 self.add_noise() 206 self.check_anomaly() 207 208 209 def testLowConstantAnomaly(self): 210 """Sine wave signal with low constant anomaly. 211 212 The anomaly is one block of constant values. 213 214 """ 215 self.create_constant_anomaly(0.05) 216 self.insert_anomaly() 217 self.check_anomaly() 218 219 220 def testLowConstantAnomalyNoise(self): 221 """Sine wave signal with low constant anomaly and noise. 222 223 The anomaly is one block of constant values. 224 225 """ 226 self.create_constant_anomaly(0.05) 227 self.insert_anomaly() 228 self.add_noise() 229 self.check_anomaly() 230 231 232 def testHighConstantAnomaly(self): 233 """Sine wave signal with high constant anomaly. 234 235 The anomaly is one block of constant values. 236 237 """ 238 self.create_constant_anomaly(2) 239 self.insert_anomaly() 240 self.check_anomaly() 241 242 243 def testHighConstantAnomalyNoise(self): 244 """Sine wave signal with high constant anomaly and noise. 245 246 The anomaly is one block of constant values. 247 248 """ 249 self.create_constant_anomaly(2) 250 self.insert_anomaly() 251 self.add_noise() 252 self.check_anomaly() 253 254 255 def testSkippedAnomaly(self): 256 """Sine wave signal with skipped anomaly. 257 258 The anomaly simulates the symptom where a block is skipped. 259 260 """ 261 self.generate_skip_anomaly() 262 self.check_anomaly() 263 264 265 def testSkippedAnomalyNoise(self): 266 """Sine wave signal with skipped anomaly with noise. 267 268 The anomaly simulates the symptom where a block is skipped. 269 270 """ 271 self.generate_skip_anomaly() 272 self.add_noise() 273 self.check_anomaly() 274 275 276 def testEmptyData(self): 277 """Checks that anomaly detection rejects empty data.""" 278 self.y = [] 279 with self.assertRaises(audio_analysis.EmptyDataError): 280 self.check_anomaly() 281 282 283if __name__ == '__main__': 284 logging.basicConfig(level=logging.DEBUG) 285 unittest.main() 286