1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides utilities to do audio data analysis."""
6
7import logging
8import numpy
9import operator
10
11# Only peaks with coefficient greater than 0.01 of the first peak should be
12# considered. Note that this correspond to -40dB in the spectrum.
13DEFAULT_MIN_PEAK_RATIO = 0.01
14
15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
16
17# The minimum RMS value of meaningful audio data.
18MEANINGFUL_RMS_THRESHOLD = 0.001
19
20class RMSTooSmallError(Exception):
21    """Error when signal RMS is too small."""
22    pass
23
24
25class EmptyDataError(Exception):
26    """Error when signal is empty."""
27
28
29def normalize_signal(signal, saturate_value):
30    """Normalizes the signal with respect to the saturate value.
31
32    @param signal: A list for one-channel PCM data.
33    @param saturate_value: The maximum value that the PCM data might be.
34
35    @returns: A numpy array containing normalized signal. The normalized signal
36              has value -1 and 1 when it saturates.
37
38    """
39    signal = numpy.array(signal)
40    return signal / float(saturate_value)
41
42
43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
44                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
45    """Gets the dominant frequencies by spectral analysis.
46
47    @param signal: A list of numbers for one-channel PCM data. This should be
48                   normalized to [-1, 1] so the function can check if signal RMS
49                   is too small to be meaningful.
50    @param rate: Sampling rate.
51    @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
52                           peaks other than the greatest one should be
53                           considered.
54                           This is to ignore peaks that are too small compared
55                           to the first peak peak_0.
56    @param peak_window_size_hz: The window size in Hz to find the peaks.
57                                The minimum differences between found peaks will
58                                be half of this value.
59
60    @returns: A list of tuples:
61              [(peak_frequency_0, peak_coefficient_0),
62               (peak_frequency_1, peak_coefficient_1),
63               (peak_frequency_2, peak_coefficient_2), ...]
64              where the tuples are sorted by coefficients.
65              The last peak_coefficient will be no less than
66              peak_coefficient * min_peak_ratio.
67
68    """
69    # Checks the signal is meaningful.
70    if len(signal) == 0:
71        raise EmptyDataError('Signal data is empty')
72
73    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
74    logging.debug('signal RMS = %s', signal_rms)
75    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
76        raise RMSTooSmallError(
77                'RMS %s is too small to be meaningful' % signal_rms)
78
79    # First, pass signal through a window function to mitigate spectral leakage.
80    y_conv_w = signal * numpy.hanning(len(signal))
81
82    length = len(y_conv_w)
83
84    # x_f is the frequency in Hz, y_f is the transformed coefficient.
85    x_f = _rfft_freq(length, rate)
86    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
87
88    # y_f is complex so consider its absolute value for magnitude.
89    abs_y_f = numpy.abs(y_f)
90    threshold = max(abs_y_f) * min_peak_ratio
91
92    # Suppresses all coefficients that are below threshold.
93    for i in xrange(len(abs_y_f)):
94        if abs_y_f[i] < threshold:
95            abs_y_f[i] = 0
96
97    # Gets the peak detection window size in indice.
98    # x_f[1] is the frequency difference per index.
99    peak_window_size = int(peak_window_size_hz / x_f[1])
100
101    # Detects peaks.
102    peaks = _peak_detection(abs_y_f, peak_window_size)
103
104    # Transform back the peak location from index to frequency.
105    results = []
106    for index, value in peaks:
107        results.append((x_f[index], value))
108    return results
109
110
111def _rfft_freq(length, rate):
112    """Gets the frequency at each index of real FFT.
113
114    @param length: The window length of FFT.
115    @param rate: Sampling rate.
116
117    @returns: A numpy array containing frequency corresponding to
118              numpy.fft.rfft result at each index.
119
120    """
121    # The difference in Hz between each index.
122    val = rate / float(length)
123    # Only care half of frequencies for FFT on real signal.
124    result_length = length // 2 + 1
125    return numpy.linspace(0, (result_length - 1) * val, result_length)
126
127
128def _peak_detection(array, window_size):
129    """Detects peaks in an array.
130
131    A point (i, array[i]) is a peak if array[i] is the maximum among
132    array[i - half_window_size] to array[i + half_window_size].
133    If array[i - half_window_size] to array[i + half_window_size] are all equal,
134    then there is no peak in this window.
135
136    @param window_size: The window to detect peaks.
137
138    @returns: A list of tuples:
139              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
140              where the tuples are sorted by peak values.
141
142    """
143    half_window_size = window_size / 2
144    length = len(array)
145
146    def find_max(numbers):
147        """Gets the index where maximum value happens.
148
149        @param numbers: A list of numbers.
150
151        @returns: (index, value) where value = numbers[index] is the maximum
152                  among numbers.
153
154        """
155        index, value = max(enumerate(numbers), key=lambda x: x[1])
156        return index, value
157
158    results = []
159    for mid in xrange(length):
160        left = max(0, mid - half_window_size)
161        right = min(length - 1, mid + half_window_size)
162        numbers_in_window = array[left:right + 1]
163        max_index, max_value = find_max(numbers_in_window)
164
165        # Add the offset back.
166        max_index = max_index + left
167
168        # If all values are the same then there is no peak in this window.
169        if max_value != min(numbers_in_window) and max_index == mid:
170            results.append((mid, max_value))
171
172    # Sort the peaks by values.
173    return sorted(results, key=lambda x: x[1], reverse=True)
174
175
176# The default pattern mathing threshold. By experiment, this threshold
177# can tolerate normal noise of 0.3 amplitude when sine wave signal
178# amplitude is 1.
179PATTERN_MATCHING_THRESHOLD = 0.85
180
181# The default block size of pattern matching.
182ANOMALY_DETECTION_BLOCK_SIZE = 120
183
184def anomaly_detection(signal, rate, freq,
185                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
186                      threshold=PATTERN_MATCHING_THRESHOLD):
187    """Detects anomaly in a sine wave signal.
188
189    This method detects anomaly in a sine wave signal by matching
190    patterns of each block.
191    For each moving window of block in the test signal, checks if there
192    is any block in golden signal that is similar to this block of test signal.
193    If there is such a block in golden signal, then this block of test
194    signal is matched and there is no anomaly in this block of test signal.
195    If there is any block in test signal that is not matched, then this block
196    covers an anomaly.
197    The block of test signal starts from index 0, and proceeds in steps of
198    half block size. The overlapping of test signal blocks makes sure there must
199    be at least one block covering the transition from sine wave to anomaly.
200
201    @param signal: A 1-D array-like object for 1-channel PCM data.
202    @param rate: The sampling rate.
203    @param freq: The expected frequency of signal.
204    @param block_size: The block size in samples to detect anomaly.
205    @param threshold: The threshold of correlation index to be judge as matched.
206
207    @returns: A list containing detected anomaly time in seconds.
208
209    """
210    if len(signal) == 0:
211        raise EmptyDataError('Signal data is empty')
212
213    golden_y = _generate_golden_pattern(rate, freq, block_size)
214
215    results = []
216
217    for start in xrange(0, len(signal), block_size / 2):
218        end = start + block_size
219        test_signal = signal[start:end]
220        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
221        if not matched:
222            results.append(start)
223
224    results = [float(x) / rate for x in results]
225
226    return results
227
228
229def _generate_golden_pattern(rate, freq, block_size):
230    """Generates a golden pattern of certain frequency.
231
232    The golden pattern must cover all the possibilities of waveforms in a
233    block. So, we need a golden pattern covering 1 period + 1 block size,
234    such that the test block can start anywhere in a period, and extends
235    a block size.
236
237    |period |1 bk|
238    |       |    |
239     . .     . .
240    .   .   .   .
241         . .     .
242
243    @param rate: The sampling rate.
244    @param freq: The frequency of golden pattern.
245    @param block_size: The block size in samples to detect anomaly.
246
247    @returns: A 1-D array for golden pattern.
248
249    """
250    samples_in_a_period = int(rate / freq) + 1
251    samples_in_golden_pattern = samples_in_a_period + block_size
252    golden_x = numpy.linspace(
253            0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
254            samples_in_golden_pattern)
255    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
256    return golden_y
257
258
259def _moving_pattern_matching(golden_signal, test_signal, threshold):
260    """Checks if test_signal is similar to any block of golden_signal.
261
262    Compares test signal with each block of golden signal by correlation
263    index. If there is any block of golden signal that is similar to
264    test signal, then it is matched.
265
266    @param golden_signal: A 1-D array for golden signal.
267    @param test_signal: A 1-D array for test signal.
268    @param threshold: The threshold of correlation index to be judge as matched.
269
270    @returns: True if there is a match. False otherwise.
271
272    @raises: ValueError: if test signal is longer than golden signal.
273
274    """
275    if len(golden_signal) < len(test_signal):
276        raise ValueError('Test signal is longer than golden signal')
277
278    block_length = len(test_signal)
279    number_of_movings = len(golden_signal) - block_length + 1
280    correlation_indices = []
281    for moving_index in xrange(number_of_movings):
282        # Cuts one block of golden signal from start index.
283        # The block length is the same as test signal.
284        start = moving_index
285        end = start + block_length
286        golden_signal_block = golden_signal[start:end]
287        try:
288            correlation_index = _get_correlation_index(
289                    golden_signal_block, test_signal)
290        except TestSignalNormTooSmallError:
291            logging.info('Caught one block of test signal that has no meaningful norm')
292            return False
293        correlation_indices.append(correlation_index)
294
295    # Checks if the maximum correlation index is high enough.
296    max_corr = max(correlation_indices)
297    if max_corr < threshold:
298        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
299        return False
300    return True
301
302
303class GoldenSignalNormTooSmallError(Exception):
304    """Exception when golden signal norm is too small."""
305    pass
306
307
308class TestSignalNormTooSmallError(Exception):
309    """Exception when test signal norm is too small."""
310    pass
311
312
313_MINIMUM_SIGNAL_NORM = 0.001
314
315def _get_correlation_index(golden_signal, test_signal):
316    """Computes correlation index of two signal of same length.
317
318    @param golden_signal: An 1-D array-like object.
319    @param test_signal: An 1-D array-like object.
320
321    @raises: ValueError: if two signal have different lengths.
322    @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
323    @raises: TestSignalNormTooSmallError: if test signal norm is too small.
324
325    @returns: The correlation index.
326    """
327    if len(golden_signal) != len(test_signal):
328        raise ValueError(
329                'Only accepts signal of same length: %s, %s' % (
330                        len(golden_signal), len(test_signal)))
331
332    norm_golden = numpy.linalg.norm(golden_signal)
333    norm_test = numpy.linalg.norm(test_signal)
334    if norm_golden <= _MINIMUM_SIGNAL_NORM:
335        raise GoldenSignalNormTooSmallError(
336                'No meaningful data as norm is too small.')
337    if norm_test <= _MINIMUM_SIGNAL_NORM:
338        raise TestSignalNormTooSmallError(
339                'No meaningful data as norm is too small.')
340
341    # The 'valid' cross correlation result of two signals of same length will
342    # contain only one number.
343    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
344    return correlation / (norm_golden * norm_test)
345