1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides utilities to do audio data analysis.""" 6 7import logging 8import numpy 9import operator 10 11# Only peaks with coefficient greater than 0.01 of the first peak should be 12# considered. Note that this correspond to -40dB in the spectrum. 13DEFAULT_MIN_PEAK_RATIO = 0.01 14 15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. 16 17# The minimum RMS value of meaningful audio data. 18MEANINGFUL_RMS_THRESHOLD = 0.001 19 20class RMSTooSmallError(Exception): 21 """Error when signal RMS is too small.""" 22 pass 23 24 25class EmptyDataError(Exception): 26 """Error when signal is empty.""" 27 28 29def normalize_signal(signal, saturate_value): 30 """Normalizes the signal with respect to the saturate value. 31 32 @param signal: A list for one-channel PCM data. 33 @param saturate_value: The maximum value that the PCM data might be. 34 35 @returns: A numpy array containing normalized signal. The normalized signal 36 has value -1 and 1 when it saturates. 37 38 """ 39 signal = numpy.array(signal) 40 return signal / float(saturate_value) 41 42 43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 44 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 45 """Gets the dominant frequencies by spectral analysis. 46 47 @param signal: A list of numbers for one-channel PCM data. This should be 48 normalized to [-1, 1] so the function can check if signal RMS 49 is too small to be meaningful. 50 @param rate: Sampling rate. 51 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the 52 peaks other than the greatest one should be 53 considered. 54 This is to ignore peaks that are too small compared 55 to the first peak peak_0. 56 @param peak_window_size_hz: The window size in Hz to find the peaks. 57 The minimum differences between found peaks will 58 be half of this value. 59 60 @returns: A list of tuples: 61 [(peak_frequency_0, peak_coefficient_0), 62 (peak_frequency_1, peak_coefficient_1), 63 (peak_frequency_2, peak_coefficient_2), ...] 64 where the tuples are sorted by coefficients. 65 The last peak_coefficient will be no less than 66 peak_coefficient * min_peak_ratio. 67 68 """ 69 # Checks the signal is meaningful. 70 if len(signal) == 0: 71 raise EmptyDataError('Signal data is empty') 72 73 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 74 logging.debug('signal RMS = %s', signal_rms) 75 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 76 raise RMSTooSmallError( 77 'RMS %s is too small to be meaningful' % signal_rms) 78 79 logging.debug('Doing spectral analysis ...') 80 81 # First, pass signal through a window function to mitigate spectral leakage. 82 y_conv_w = signal * numpy.hanning(len(signal)) 83 84 length = len(y_conv_w) 85 86 # x_f is the frequency in Hz, y_f is the transformed coefficient. 87 x_f = _rfft_freq(length, rate) 88 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 89 90 # y_f is complex so consider its absolute value for magnitude. 91 abs_y_f = numpy.abs(y_f) 92 threshold = max(abs_y_f) * min_peak_ratio 93 94 # Suppresses all coefficients that are below threshold. 95 for i in xrange(len(abs_y_f)): 96 if abs_y_f[i] < threshold: 97 abs_y_f[i] = 0 98 99 # Gets the peak detection window size in indice. 100 # x_f[1] is the frequency difference per index. 101 peak_window_size = int(peak_window_size_hz / x_f[1]) 102 103 # Detects peaks. 104 peaks = peak_detection(abs_y_f, peak_window_size) 105 106 # Transform back the peak location from index to frequency. 107 results = [] 108 for index, value in peaks: 109 results.append((x_f[index], value)) 110 return results 111 112 113def _rfft_freq(length, rate): 114 """Gets the frequency at each index of real FFT. 115 116 @param length: The window length of FFT. 117 @param rate: Sampling rate. 118 119 @returns: A numpy array containing frequency corresponding to 120 numpy.fft.rfft result at each index. 121 122 """ 123 # The difference in Hz between each index. 124 val = rate / float(length) 125 # Only care half of frequencies for FFT on real signal. 126 result_length = length // 2 + 1 127 return numpy.linspace(0, (result_length - 1) * val, result_length) 128 129 130def peak_detection(array, window_size): 131 """Detects peaks in an array. 132 133 A point (i, array[i]) is a peak if array[i] is the maximum among 134 array[i - half_window_size] to array[i + half_window_size]. 135 If array[i - half_window_size] to array[i + half_window_size] are all equal, 136 then there is no peak in this window. 137 Note that we only consider peak with value greater than 0. 138 139 @param window_size: The window to detect peaks. 140 141 @returns: A list of tuples: 142 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 143 where the tuples are sorted by peak values. 144 145 """ 146 half_window_size = window_size / 2 147 length = len(array) 148 149 def mid_is_peak(array, mid, left, right): 150 """Checks if value at mid is the largest among left to right in array. 151 152 @param array: A list of numbers. 153 @param mid: The mid index. 154 @param left: The left index. 155 @param rigth: The right index. 156 157 @returns: A tuple (is_peak, next_candidate) 158 is_peak is True if array[index] is the maximum among numbers 159 in array between index [left, right] inclusively. 160 next_candidate is the index of next candidate for peak if 161 is_peak is False. It is the index of maximum value in 162 [mid + 1, right]. If is_peak is True, next_candidate is 163 right + 1. 164 165 """ 166 value_mid = array[mid] 167 is_peak = True 168 next_peak_candidate_index = None 169 170 # Check the left half window. 171 for index in xrange(left, mid): 172 if array[index] >= value_mid: 173 is_peak = False 174 break 175 176 # Mid is at the end of array. 177 if mid == right: 178 return is_peak, right + 1 179 180 # Check the right half window and also record next candidate. 181 # Favor the larger index for next_peak_candidate_index. 182 for index in xrange(right, mid, -1): 183 if (next_peak_candidate_index is None or 184 array[index] > array[next_peak_candidate_index]): 185 next_peak_candidate_index = index 186 187 if array[next_peak_candidate_index] >= value_mid: 188 is_peak = False 189 190 if is_peak: 191 next_peak_candidate_index = right + 1 192 193 return is_peak, next_peak_candidate_index 194 195 results = [] 196 mid = 0 197 next_candidate_idx = None 198 while mid < length: 199 left = max(0, mid - half_window_size) 200 right = min(length - 1, mid + half_window_size) 201 202 # Only consider value greater than 0. 203 if array[mid] == 0: 204 mid = mid + 1 205 continue; 206 207 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 208 209 if is_peak: 210 results.append((mid, array[mid])) 211 212 # Use the next candidate found in [mid + 1, right], or right + 1. 213 mid = next_candidate_idx 214 215 # Sort the peaks by values. 216 return sorted(results, key=lambda x: x[1], reverse=True) 217 218 219# The default pattern mathing threshold. By experiment, this threshold 220# can tolerate normal noise of 0.3 amplitude when sine wave signal 221# amplitude is 1. 222PATTERN_MATCHING_THRESHOLD = 0.85 223 224# The default block size of pattern matching. 225ANOMALY_DETECTION_BLOCK_SIZE = 120 226 227def anomaly_detection(signal, rate, freq, 228 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 229 threshold=PATTERN_MATCHING_THRESHOLD): 230 """Detects anomaly in a sine wave signal. 231 232 This method detects anomaly in a sine wave signal by matching 233 patterns of each block. 234 For each moving window of block in the test signal, checks if there 235 is any block in golden signal that is similar to this block of test signal. 236 If there is such a block in golden signal, then this block of test 237 signal is matched and there is no anomaly in this block of test signal. 238 If there is any block in test signal that is not matched, then this block 239 covers an anomaly. 240 The block of test signal starts from index 0, and proceeds in steps of 241 half block size. The overlapping of test signal blocks makes sure there must 242 be at least one block covering the transition from sine wave to anomaly. 243 244 @param signal: A 1-D array-like object for 1-channel PCM data. 245 @param rate: The sampling rate. 246 @param freq: The expected frequency of signal. 247 @param block_size: The block size in samples to detect anomaly. 248 @param threshold: The threshold of correlation index to be judge as matched. 249 250 @returns: A list containing detected anomaly time in seconds. 251 252 """ 253 if len(signal) == 0: 254 raise EmptyDataError('Signal data is empty') 255 256 golden_y = _generate_golden_pattern(rate, freq, block_size) 257 258 results = [] 259 260 for start in xrange(0, len(signal), block_size / 2): 261 end = start + block_size 262 test_signal = signal[start:end] 263 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 264 if not matched: 265 results.append(start) 266 267 results = [float(x) / rate for x in results] 268 269 return results 270 271 272def _generate_golden_pattern(rate, freq, block_size): 273 """Generates a golden pattern of certain frequency. 274 275 The golden pattern must cover all the possibilities of waveforms in a 276 block. So, we need a golden pattern covering 1 period + 1 block size, 277 such that the test block can start anywhere in a period, and extends 278 a block size. 279 280 |period |1 bk| 281 | | | 282 . . . . 283 . . . . 284 . . . 285 286 @param rate: The sampling rate. 287 @param freq: The frequency of golden pattern. 288 @param block_size: The block size in samples to detect anomaly. 289 290 @returns: A 1-D array for golden pattern. 291 292 """ 293 samples_in_a_period = int(rate / freq) + 1 294 samples_in_golden_pattern = samples_in_a_period + block_size 295 golden_x = numpy.linspace( 296 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, 297 samples_in_golden_pattern) 298 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 299 return golden_y 300 301 302def _moving_pattern_matching(golden_signal, test_signal, threshold): 303 """Checks if test_signal is similar to any block of golden_signal. 304 305 Compares test signal with each block of golden signal by correlation 306 index. If there is any block of golden signal that is similar to 307 test signal, then it is matched. 308 309 @param golden_signal: A 1-D array for golden signal. 310 @param test_signal: A 1-D array for test signal. 311 @param threshold: The threshold of correlation index to be judge as matched. 312 313 @returns: True if there is a match. False otherwise. 314 315 @raises: ValueError: if test signal is longer than golden signal. 316 317 """ 318 if len(golden_signal) < len(test_signal): 319 raise ValueError('Test signal is longer than golden signal') 320 321 block_length = len(test_signal) 322 number_of_movings = len(golden_signal) - block_length + 1 323 correlation_indices = [] 324 for moving_index in xrange(number_of_movings): 325 # Cuts one block of golden signal from start index. 326 # The block length is the same as test signal. 327 start = moving_index 328 end = start + block_length 329 golden_signal_block = golden_signal[start:end] 330 try: 331 correlation_index = _get_correlation_index( 332 golden_signal_block, test_signal) 333 except TestSignalNormTooSmallError: 334 logging.info('Caught one block of test signal that has no meaningful norm') 335 return False 336 correlation_indices.append(correlation_index) 337 338 # Checks if the maximum correlation index is high enough. 339 max_corr = max(correlation_indices) 340 if max_corr < threshold: 341 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 342 return False 343 return True 344 345 346class GoldenSignalNormTooSmallError(Exception): 347 """Exception when golden signal norm is too small.""" 348 pass 349 350 351class TestSignalNormTooSmallError(Exception): 352 """Exception when test signal norm is too small.""" 353 pass 354 355 356_MINIMUM_SIGNAL_NORM = 0.001 357 358def _get_correlation_index(golden_signal, test_signal): 359 """Computes correlation index of two signal of same length. 360 361 @param golden_signal: An 1-D array-like object. 362 @param test_signal: An 1-D array-like object. 363 364 @raises: ValueError: if two signal have different lengths. 365 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small 366 @raises: TestSignalNormTooSmallError: if test signal norm is too small. 367 368 @returns: The correlation index. 369 """ 370 if len(golden_signal) != len(test_signal): 371 raise ValueError( 372 'Only accepts signal of same length: %s, %s' % ( 373 len(golden_signal), len(test_signal))) 374 375 norm_golden = numpy.linalg.norm(golden_signal) 376 norm_test = numpy.linalg.norm(test_signal) 377 if norm_golden <= _MINIMUM_SIGNAL_NORM: 378 raise GoldenSignalNormTooSmallError( 379 'No meaningful data as norm is too small.') 380 if norm_test <= _MINIMUM_SIGNAL_NORM: 381 raise TestSignalNormTooSmallError( 382 'No meaningful data as norm is too small.') 383 384 # The 'valid' cross correlation result of two signals of same length will 385 # contain only one number. 386 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 387 return correlation / (norm_golden * norm_test) 388