1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides utilities to do audio data analysis.""" 6 7import logging 8import numpy 9import operator 10 11# Only peaks with coefficient greater than 0.01 of the first peak should be 12# considered. Note that this correspond to -40dB in the spectrum. 13DEFAULT_MIN_PEAK_RATIO = 0.01 14 15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection. 16 17# The minimum RMS value of meaningful audio data. 18MEANINGFUL_RMS_THRESHOLD = 0.001 19 20class RMSTooSmallError(Exception): 21 """Error when signal RMS is too small.""" 22 pass 23 24 25class EmptyDataError(Exception): 26 """Error when signal is empty.""" 27 28 29def normalize_signal(signal, saturate_value): 30 """Normalizes the signal with respect to the saturate value. 31 32 @param signal: A list for one-channel PCM data. 33 @param saturate_value: The maximum value that the PCM data might be. 34 35 @returns: A numpy array containing normalized signal. The normalized signal 36 has value -1 and 1 when it saturates. 37 38 """ 39 signal = numpy.array(signal) 40 return signal / float(saturate_value) 41 42 43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 44 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 45 """Gets the dominant frequencies by spectral analysis. 46 47 @param signal: A list of numbers for one-channel PCM data. This should be 48 normalized to [-1, 1] so the function can check if signal RMS 49 is too small to be meaningful. 50 @param rate: Sampling rate. 51 @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the 52 peaks other than the greatest one should be 53 considered. 54 This is to ignore peaks that are too small compared 55 to the first peak peak_0. 56 @param peak_window_size_hz: The window size in Hz to find the peaks. 57 The minimum differences between found peaks will 58 be half of this value. 59 60 @returns: A list of tuples: 61 [(peak_frequency_0, peak_coefficient_0), 62 (peak_frequency_1, peak_coefficient_1), 63 (peak_frequency_2, peak_coefficient_2), ...] 64 where the tuples are sorted by coefficients. 65 The last peak_coefficient will be no less than 66 peak_coefficient * min_peak_ratio. 67 68 """ 69 # Checks the signal is meaningful. 70 if len(signal) == 0: 71 raise EmptyDataError('Signal data is empty') 72 73 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 74 logging.debug('signal RMS = %s', signal_rms) 75 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 76 raise RMSTooSmallError( 77 'RMS %s is too small to be meaningful' % signal_rms) 78 79 # First, pass signal through a window function to mitigate spectral leakage. 80 y_conv_w = signal * numpy.hanning(len(signal)) 81 82 length = len(y_conv_w) 83 84 # x_f is the frequency in Hz, y_f is the transformed coefficient. 85 x_f = _rfft_freq(length, rate) 86 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 87 88 # y_f is complex so consider its absolute value for magnitude. 89 abs_y_f = numpy.abs(y_f) 90 threshold = max(abs_y_f) * min_peak_ratio 91 92 # Suppresses all coefficients that are below threshold. 93 for i in xrange(len(abs_y_f)): 94 if abs_y_f[i] < threshold: 95 abs_y_f[i] = 0 96 97 # Gets the peak detection window size in indice. 98 # x_f[1] is the frequency difference per index. 99 peak_window_size = int(peak_window_size_hz / x_f[1]) 100 101 # Detects peaks. 102 peaks = _peak_detection(abs_y_f, peak_window_size) 103 104 # Transform back the peak location from index to frequency. 105 results = [] 106 for index, value in peaks: 107 results.append((x_f[index], value)) 108 return results 109 110 111def _rfft_freq(length, rate): 112 """Gets the frequency at each index of real FFT. 113 114 @param length: The window length of FFT. 115 @param rate: Sampling rate. 116 117 @returns: A numpy array containing frequency corresponding to 118 numpy.fft.rfft result at each index. 119 120 """ 121 # The difference in Hz between each index. 122 val = rate / float(length) 123 # Only care half of frequencies for FFT on real signal. 124 result_length = length // 2 + 1 125 return numpy.linspace(0, (result_length - 1) * val, result_length) 126 127 128def _peak_detection(array, window_size): 129 """Detects peaks in an array. 130 131 A point (i, array[i]) is a peak if array[i] is the maximum among 132 array[i - half_window_size] to array[i + half_window_size]. 133 If array[i - half_window_size] to array[i + half_window_size] are all equal, 134 then there is no peak in this window. 135 136 @param window_size: The window to detect peaks. 137 138 @returns: A list of tuples: 139 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 140 where the tuples are sorted by peak values. 141 142 """ 143 half_window_size = window_size / 2 144 length = len(array) 145 146 def find_max(numbers): 147 """Gets the index where maximum value happens. 148 149 @param numbers: A list of numbers. 150 151 @returns: (index, value) where value = numbers[index] is the maximum 152 among numbers. 153 154 """ 155 index, value = max(enumerate(numbers), key=lambda x: x[1]) 156 return index, value 157 158 results = [] 159 for mid in xrange(length): 160 left = max(0, mid - half_window_size) 161 right = min(length - 1, mid + half_window_size) 162 numbers_in_window = array[left:right + 1] 163 max_index, max_value = find_max(numbers_in_window) 164 165 # Add the offset back. 166 max_index = max_index + left 167 168 # If all values are the same then there is no peak in this window. 169 if max_value != min(numbers_in_window) and max_index == mid: 170 results.append((mid, max_value)) 171 172 # Sort the peaks by values. 173 return sorted(results, key=lambda x: x[1], reverse=True) 174 175 176# The default pattern mathing threshold. By experiment, this threshold 177# can tolerate normal noise of 0.3 amplitude when sine wave signal 178# amplitude is 1. 179PATTERN_MATCHING_THRESHOLD = 0.85 180 181# The default block size of pattern matching. 182ANOMALY_DETECTION_BLOCK_SIZE = 120 183 184def anomaly_detection(signal, rate, freq, 185 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 186 threshold=PATTERN_MATCHING_THRESHOLD): 187 """Detects anomaly in a sine wave signal. 188 189 This method detects anomaly in a sine wave signal by matching 190 patterns of each block. 191 For each moving window of block in the test signal, checks if there 192 is any block in golden signal that is similar to this block of test signal. 193 If there is such a block in golden signal, then this block of test 194 signal is matched and there is no anomaly in this block of test signal. 195 If there is any block in test signal that is not matched, then this block 196 covers an anomaly. 197 The block of test signal starts from index 0, and proceeds in steps of 198 half block size. The overlapping of test signal blocks makes sure there must 199 be at least one block covering the transition from sine wave to anomaly. 200 201 @param signal: A 1-D array-like object for 1-channel PCM data. 202 @param rate: The sampling rate. 203 @param freq: The expected frequency of signal. 204 @param block_size: The block size in samples to detect anomaly. 205 @param threshold: The threshold of correlation index to be judge as matched. 206 207 @returns: A list containing detected anomaly time in seconds. 208 209 """ 210 if len(signal) == 0: 211 raise EmptyDataError('Signal data is empty') 212 213 golden_y = _generate_golden_pattern(rate, freq, block_size) 214 215 results = [] 216 217 for start in xrange(0, len(signal), block_size / 2): 218 end = start + block_size 219 test_signal = signal[start:end] 220 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 221 if not matched: 222 results.append(start) 223 224 results = [float(x) / rate for x in results] 225 226 return results 227 228 229def _generate_golden_pattern(rate, freq, block_size): 230 """Generates a golden pattern of certain frequency. 231 232 The golden pattern must cover all the possibilities of waveforms in a 233 block. So, we need a golden pattern covering 1 period + 1 block size, 234 such that the test block can start anywhere in a period, and extends 235 a block size. 236 237 |period |1 bk| 238 | | | 239 . . . . 240 . . . . 241 . . . 242 243 @param rate: The sampling rate. 244 @param freq: The frequency of golden pattern. 245 @param block_size: The block size in samples to detect anomaly. 246 247 @returns: A 1-D array for golden pattern. 248 249 """ 250 samples_in_a_period = int(rate / freq) + 1 251 samples_in_golden_pattern = samples_in_a_period + block_size 252 golden_x = numpy.linspace( 253 0.0, (samples_in_golden_pattern - 1) * 1.0 / rate, 254 samples_in_golden_pattern) 255 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 256 return golden_y 257 258 259def _moving_pattern_matching(golden_signal, test_signal, threshold): 260 """Checks if test_signal is similar to any block of golden_signal. 261 262 Compares test signal with each block of golden signal by correlation 263 index. If there is any block of golden signal that is similar to 264 test signal, then it is matched. 265 266 @param golden_signal: A 1-D array for golden signal. 267 @param test_signal: A 1-D array for test signal. 268 @param threshold: The threshold of correlation index to be judge as matched. 269 270 @returns: True if there is a match. False otherwise. 271 272 @raises: ValueError: if test signal is longer than golden signal. 273 274 """ 275 if len(golden_signal) < len(test_signal): 276 raise ValueError('Test signal is longer than golden signal') 277 278 block_length = len(test_signal) 279 number_of_movings = len(golden_signal) - block_length + 1 280 correlation_indices = [] 281 for moving_index in xrange(number_of_movings): 282 # Cuts one block of golden signal from start index. 283 # The block length is the same as test signal. 284 start = moving_index 285 end = start + block_length 286 golden_signal_block = golden_signal[start:end] 287 try: 288 correlation_index = _get_correlation_index( 289 golden_signal_block, test_signal) 290 except TestSignalNormTooSmallError: 291 logging.info('Caught one block of test signal that has no meaningful norm') 292 return False 293 correlation_indices.append(correlation_index) 294 295 # Checks if the maximum correlation index is high enough. 296 max_corr = max(correlation_indices) 297 if max_corr < threshold: 298 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 299 return False 300 return True 301 302 303class GoldenSignalNormTooSmallError(Exception): 304 """Exception when golden signal norm is too small.""" 305 pass 306 307 308class TestSignalNormTooSmallError(Exception): 309 """Exception when test signal norm is too small.""" 310 pass 311 312 313_MINIMUM_SIGNAL_NORM = 0.001 314 315def _get_correlation_index(golden_signal, test_signal): 316 """Computes correlation index of two signal of same length. 317 318 @param golden_signal: An 1-D array-like object. 319 @param test_signal: An 1-D array-like object. 320 321 @raises: ValueError: if two signal have different lengths. 322 @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small 323 @raises: TestSignalNormTooSmallError: if test signal norm is too small. 324 325 @returns: The correlation index. 326 """ 327 if len(golden_signal) != len(test_signal): 328 raise ValueError( 329 'Only accepts signal of same length: %s, %s' % ( 330 len(golden_signal), len(test_signal))) 331 332 norm_golden = numpy.linalg.norm(golden_signal) 333 norm_test = numpy.linalg.norm(test_signal) 334 if norm_golden <= _MINIMUM_SIGNAL_NORM: 335 raise GoldenSignalNormTooSmallError( 336 'No meaningful data as norm is too small.') 337 if norm_test <= _MINIMUM_SIGNAL_NORM: 338 raise TestSignalNormTooSmallError( 339 'No meaningful data as norm is too small.') 340 341 # The 'valid' cross correlation result of two signals of same length will 342 # contain only one number. 343 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 344 return correlation / (norm_golden * norm_test) 345