1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""This module provides utilities to do audio data analysis.""" 17 18import logging 19import numpy 20import operator 21 22# The default block size of pattern matching. 23ANOMALY_DETECTION_BLOCK_SIZE = 120 24 25# Only peaks with coefficient greater than 0.01 of the first peak should be 26# considered. Note that this correspond to -40dB in the spectrum. 27DEFAULT_MIN_PEAK_RATIO = 0.01 28 29# The minimum RMS value of meaningful audio data. 30MEANINGFUL_RMS_THRESHOLD = 0.001 31 32# The minimal signal norm value. 33_MINIMUM_SIGNAL_NORM = 0.001 34 35# The default pattern mathing threshold. By experiment, this threshold 36# can tolerate normal noise of 0.3 amplitude when sine wave signal 37# amplitude is 1. 38PATTERN_MATCHING_THRESHOLD = 0.85 39 40# Window size for peak detection. 41PEAK_WINDOW_SIZE_HZ = 20 42 43 44class RMSTooSmallError(Exception): 45 """Error when signal RMS is too small.""" 46 pass 47 48 49class EmptyDataError(Exception): 50 """Error when signal is empty.""" 51 pass 52 53 54def normalize_signal(signal, saturate_value): 55 """Normalizes the signal with respect to the saturate value. 56 57 Args: 58 signal: A list for one-channel PCM data. 59 saturate_value: The maximum value that the PCM data might be. 60 61 Returns: 62 A numpy array containing normalized signal. The normalized signal has 63 value -1 and 1 when it saturates. 64 65 """ 66 signal = numpy.array(signal) 67 return signal / float(saturate_value) 68 69 70def spectral_analysis(signal, 71 rate, 72 min_peak_ratio=DEFAULT_MIN_PEAK_RATIO, 73 peak_window_size_hz=PEAK_WINDOW_SIZE_HZ): 74 """Gets the dominant frequencies by spectral analysis. 75 76 Args: 77 signal: A list of numbers for one-channel PCM data. This should be 78 normalized to [-1, 1] so the function can check if signal RMS 79 is too small to be meaningful. 80 rate: Sampling rate in samples per second. Example inputs: 44100, 81 48000 82 min_peak_ratio: The minimum peak_i/peak_0 ratio such that the 83 peaks other than the greatest one should be 84 considered. 85 This is to ignore peaks that are too small compared 86 to the first peak peak_0. 87 peak_window_size_hz: The window size in Hz to find the peaks. 88 The minimum differences between found peaks will 89 be half of this value. 90 91 Returns: 92 A list of tuples: 93 [(peak_frequency_0, peak_coefficient_0), 94 (peak_frequency_1, peak_coefficient_1), 95 (peak_frequency_2, peak_coefficient_2), ...] 96 where the tuples are sorted by coefficients. The last 97 peak_coefficient will be no less than peak_coefficient * 98 min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD, 99 returns [(0, 0)]. 100 101 """ 102 # Checks the signal is meaningful. 103 if len(signal) == 0: 104 raise EmptyDataError('Signal data is empty') 105 106 signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal)) 107 logging.debug('signal RMS = %s', signal_rms) 108 109 # If RMS is too small, set dominant frequency and coefficient to 0. 110 if signal_rms < MEANINGFUL_RMS_THRESHOLD: 111 logging.warning( 112 'RMS %s is too small to be meaningful. Set frequency to 0.', 113 signal_rms) 114 return [(0, 0)] 115 116 logging.debug('Doing spectral analysis ...') 117 118 # First, pass signal through a window function to mitigate spectral leakage. 119 y_conv_w = signal * numpy.hanning(len(signal)) 120 121 length = len(y_conv_w) 122 123 # x_f is the frequency in Hz, y_f is the transformed coefficient. 124 x_f = _rfft_freq(length, rate) 125 y_f = 2.0 / length * numpy.fft.rfft(y_conv_w) 126 127 # y_f is complex so consider its absolute value for magnitude. 128 abs_y_f = numpy.abs(y_f) 129 threshold = max(abs_y_f) * min_peak_ratio 130 131 # Suppresses all coefficients that are below threshold. 132 for i in range(len(abs_y_f)): 133 if abs_y_f[i] < threshold: 134 abs_y_f[i] = 0 135 136 # Gets the peak detection window size in indice. 137 # x_f[1] is the frequency difference per index. 138 peak_window_size = int(peak_window_size_hz / x_f[1]) 139 140 # Detects peaks. 141 peaks = peak_detection(abs_y_f, peak_window_size) 142 143 # Transform back the peak location from index to frequency. 144 results = [] 145 for index, value in peaks: 146 results.append((x_f[int(index)], value)) 147 return results 148 149 150def _rfft_freq(length, rate): 151 """Gets the frequency at each index of real FFT. 152 153 Args: 154 length: The window length of FFT. 155 rate: Sampling rate in samples per second. Example inputs: 44100, 156 48000 157 158 Returns: 159 A numpy array containing frequency corresponding to numpy.fft.rfft 160 result at each index. 161 162 """ 163 # The difference in Hz between each index. 164 val = rate / float(length) 165 # Only care half of frequencies for FFT on real signal. 166 result_length = length // 2 + 1 167 return numpy.linspace(0, (result_length - 1) * val, result_length) 168 169 170def peak_detection(array, window_size): 171 """Detects peaks in an array. 172 173 A point (i, array[i]) is a peak if array[i] is the maximum among 174 array[i - half_window_size] to array[i + half_window_size]. 175 If array[i - half_window_size] to array[i + half_window_size] are all equal, 176 then there is no peak in this window. 177 Note that we only consider peak with value greater than 0. 178 179 Args: 180 array: The input array to detect peaks in. Array is a list of 181 absolute values of the magnitude of transformed coefficient. 182 183 window_size: The window to detect peaks. 184 185 Returns: 186 A list of tuples: 187 [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...] 188 where the tuples are sorted by peak values. 189 190 """ 191 half_window_size = window_size / 2 192 length = len(array) 193 194 def mid_is_peak(array, mid, left, right): 195 """Checks if value at mid is the largest among left to right in array. 196 197 Args: 198 array: A list of numbers. 199 mid: The mid index. 200 left: The left index. 201 rigth: The right index. 202 203 Returns: 204 A tuple (is_peak, next_candidate) 205 is_peak is True if array[index] is the maximum among numbers 206 in array between index [left, right] inclusively. 207 next_candidate is the index of next candidate for peak if 208 is_peak is False. It is the index of maximum value in 209 [mid + 1, right]. If is_peak is True, next_candidate is 210 right + 1. 211 212 """ 213 value_mid = array[int(mid)] 214 is_peak = True 215 next_peak_candidate_index = None 216 217 # Check the left half window. 218 for index in range(int(left), int(mid)): 219 if array[index] >= value_mid: 220 is_peak = False 221 break 222 223 # Mid is at the end of array. 224 if mid == right: 225 return is_peak, right + 1 226 227 # Check the right half window and also record next candidate. 228 # Favor the larger index for next_peak_candidate_index. 229 for index in range(int(right), int(mid), -1): 230 if (next_peak_candidate_index is None or 231 array[index] > array[next_peak_candidate_index]): 232 next_peak_candidate_index = index 233 234 if array[next_peak_candidate_index] >= value_mid: 235 is_peak = False 236 237 if is_peak: 238 next_peak_candidate_index = right + 1 239 240 return is_peak, next_peak_candidate_index 241 242 results = [] 243 mid = 0 244 next_candidate_idx = None 245 while mid < length: 246 left = max(0, mid - half_window_size) 247 right = min(length - 1, mid + half_window_size) 248 249 # Only consider value greater than 0. 250 if array[int(mid)] == 0: 251 mid = mid + 1 252 continue 253 254 is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right) 255 256 if is_peak: 257 results.append((mid, array[int(mid)])) 258 259 # Use the next candidate found in [mid + 1, right], or right + 1. 260 mid = next_candidate_idx 261 262 # Sort the peaks by values. 263 return sorted(results, key=lambda x: x[1], reverse=True) 264 265 266def anomaly_detection(signal, 267 rate, 268 freq, 269 block_size=ANOMALY_DETECTION_BLOCK_SIZE, 270 threshold=PATTERN_MATCHING_THRESHOLD): 271 """Detects anomaly in a sine wave signal. 272 273 This method detects anomaly in a sine wave signal by matching 274 patterns of each block. 275 For each moving window of block in the test signal, checks if there 276 is any block in golden signal that is similar to this block of test signal. 277 If there is such a block in golden signal, then this block of test 278 signal is matched and there is no anomaly in this block of test signal. 279 If there is any block in test signal that is not matched, then this block 280 covers an anomaly. 281 The block of test signal starts from index 0, and proceeds in steps of 282 half block size. The overlapping of test signal blocks makes sure there must 283 be at least one block covering the transition from sine wave to anomaly. 284 285 Args: 286 signal: A 1-D array-like object for 1-channel PCM data. 287 rate: Sampling rate in samples per second. Example inputs: 44100, 288 48000 289 freq: The expected frequency of signal. 290 block_size: The block size in samples to detect anomaly. 291 threshold: The threshold of correlation index to be judge as matched. 292 293 Returns: 294 A list containing detected anomaly time in seconds. 295 296 """ 297 if len(signal) == 0: 298 raise EmptyDataError('Signal data is empty') 299 300 golden_y = _generate_golden_pattern(rate, freq, block_size) 301 302 results = [] 303 304 for start in range(0, len(signal), int(block_size / 2)): 305 end = start + block_size 306 test_signal = signal[start:end] 307 matched = _moving_pattern_matching(golden_y, test_signal, threshold) 308 if not matched: 309 results.append(start) 310 311 results = [float(x) / rate for x in results] 312 313 return results 314 315 316def _generate_golden_pattern(rate, freq, block_size): 317 """Generates a golden pattern of certain frequency. 318 319 The golden pattern must cover all the possibilities of waveforms in a 320 block. So, we need a golden pattern covering 1 period + 1 block size, 321 such that the test block can start anywhere in a period, and extends 322 a block size. 323 324 |period |1 bk| 325 | | | 326 . . . . 327 . . . . 328 . . . 329 330 Args: 331 rate: Sampling rate in samples per second. Example inputs: 44100, 332 48000 333 freq: The frequency of golden pattern. 334 block_size: The block size in samples to detect anomaly. 335 336 Returns: 337 A 1-D array for golden pattern. 338 339 """ 340 samples_in_a_period = int(rate / freq) + 1 341 samples_in_golden_pattern = samples_in_a_period + block_size 342 golden_x = numpy.linspace(0.0, (samples_in_golden_pattern - 1) * 1.0 / 343 rate, samples_in_golden_pattern) 344 golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x) 345 return golden_y 346 347 348def _moving_pattern_matching(golden_signal, test_signal, threshold): 349 """Checks if test_signal is similar to any block of golden_signal. 350 351 Compares test signal with each block of golden signal by correlation 352 index. If there is any block of golden signal that is similar to 353 test signal, then it is matched. 354 355 Args: 356 golden_signal: A 1-D array for golden signal. 357 test_signal: A 1-D array for test signal. 358 threshold: The threshold of correlation index to be judge as matched. 359 360 Returns: 361 True if there is a match. False otherwise. 362 363 ValueError: if test signal is longer than golden signal. 364 365 """ 366 if len(golden_signal) < len(test_signal): 367 raise ValueError('Test signal is longer than golden signal') 368 369 block_length = len(test_signal) 370 number_of_movings = len(golden_signal) - block_length + 1 371 correlation_indices = [] 372 for moving_index in range(number_of_movings): 373 # Cuts one block of golden signal from start index. 374 # The block length is the same as test signal. 375 start = moving_index 376 end = start + block_length 377 golden_signal_block = golden_signal[start:end] 378 try: 379 correlation_index = _get_correlation_index(golden_signal_block, 380 test_signal) 381 except TestSignalNormTooSmallError: 382 logging.info( 383 'Caught one block of test signal that has no meaningful norm') 384 return False 385 correlation_indices.append(correlation_index) 386 387 # Checks if the maximum correlation index is high enough. 388 max_corr = max(correlation_indices) 389 if max_corr < threshold: 390 logging.debug('Got one unmatched block with max_corr: %s', max_corr) 391 return False 392 return True 393 394 395class GoldenSignalNormTooSmallError(Exception): 396 """Exception when golden signal norm is too small.""" 397 pass 398 399 400class TestSignalNormTooSmallError(Exception): 401 """Exception when test signal norm is too small.""" 402 pass 403 404 405def _get_correlation_index(golden_signal, test_signal): 406 """Computes correlation index of two signal of same length. 407 408 Args: 409 golden_signal: An 1-D array-like object. 410 test_signal: An 1-D array-like object. 411 412 Raises: 413 ValueError: if two signal have different lengths. 414 GoldenSignalNormTooSmallError: if golden signal norm is too small 415 TestSignalNormTooSmallError: if test signal norm is too small. 416 417 Returns: 418 The correlation index. 419 """ 420 if len(golden_signal) != len(test_signal): 421 raise ValueError('Only accepts signal of same length: %s, %s' % 422 (len(golden_signal), len(test_signal))) 423 424 norm_golden = numpy.linalg.norm(golden_signal) 425 norm_test = numpy.linalg.norm(test_signal) 426 if norm_golden <= _MINIMUM_SIGNAL_NORM: 427 raise GoldenSignalNormTooSmallError( 428 'No meaningful data as norm is too small.') 429 if norm_test <= _MINIMUM_SIGNAL_NORM: 430 raise TestSignalNormTooSmallError( 431 'No meaningful data as norm is too small.') 432 433 # The 'valid' cross correlation result of two signals of same length will 434 # contain only one number. 435 correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0] 436 return correlation / (norm_golden * norm_test) 437