1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides utilities to do audio data analysis."""
6
7import logging
8import numpy
9import operator
10
11# Only peaks with coefficient greater than 0.01 of the first peak should be
12# considered. Note that this correspond to -40dB in the spectrum.
13DEFAULT_MIN_PEAK_RATIO = 0.01
14
15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
16
17# The minimum RMS value of meaningful audio data.
18MEANINGFUL_RMS_THRESHOLD = 0.001
19
20class RMSTooSmallError(Exception):
21    """Error when signal RMS is too small."""
22    pass
23
24
25class EmptyDataError(Exception):
26    """Error when signal is empty."""
27
28
29def normalize_signal(signal, saturate_value):
30    """Normalizes the signal with respect to the saturate value.
31
32    @param signal: A list for one-channel PCM data.
33    @param saturate_value: The maximum value that the PCM data might be.
34
35    @returns: A numpy array containing normalized signal. The normalized signal
36              has value -1 and 1 when it saturates.
37
38    """
39    signal = numpy.array(signal)
40    return signal / float(saturate_value)
41
42
43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
44                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
45    """Gets the dominant frequencies by spectral analysis.
46
47    @param signal: A list of numbers for one-channel PCM data. This should be
48                   normalized to [-1, 1] so the function can check if signal RMS
49                   is too small to be meaningful.
50    @param rate: Sampling rate.
51    @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
52                           peaks other than the greatest one should be
53                           considered.
54                           This is to ignore peaks that are too small compared
55                           to the first peak peak_0.
56    @param peak_window_size_hz: The window size in Hz to find the peaks.
57                                The minimum differences between found peaks will
58                                be half of this value.
59
60    @returns: A list of tuples:
61              [(peak_frequency_0, peak_coefficient_0),
62               (peak_frequency_1, peak_coefficient_1),
63               (peak_frequency_2, peak_coefficient_2), ...]
64              where the tuples are sorted by coefficients.
65              The last peak_coefficient will be no less than
66              peak_coefficient * min_peak_ratio.
67              If RMS is less than MEANINGFUL_RMS_THRESHOLD, returns [(0, 0)].
68
69    """
70    # Checks the signal is meaningful.
71    if len(signal) == 0:
72        raise EmptyDataError('Signal data is empty')
73
74    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
75    logging.debug('signal RMS = %s', signal_rms)
76
77    # If RMS is too small, set dominant frequency and coefficient to 0.
78    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
79        logging.warning(
80                'RMS %s is too small to be meaningful. Set frequency to 0.',
81                signal_rms)
82        return [(0, 0)]
83
84    logging.debug('Doing spectral analysis ...')
85
86    # First, pass signal through a window function to mitigate spectral leakage.
87    y_conv_w = signal * numpy.hanning(len(signal))
88
89    length = len(y_conv_w)
90
91    # x_f is the frequency in Hz, y_f is the transformed coefficient.
92    x_f = _rfft_freq(length, rate)
93    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
94
95    # y_f is complex so consider its absolute value for magnitude.
96    abs_y_f = numpy.abs(y_f)
97    threshold = max(abs_y_f) * min_peak_ratio
98
99    # Suppresses all coefficients that are below threshold.
100    for i in xrange(len(abs_y_f)):
101        if abs_y_f[i] < threshold:
102            abs_y_f[i] = 0
103
104    # Gets the peak detection window size in indice.
105    # x_f[1] is the frequency difference per index.
106    peak_window_size = int(peak_window_size_hz / x_f[1])
107
108    # Detects peaks.
109    peaks = peak_detection(abs_y_f, peak_window_size)
110
111    # Transform back the peak location from index to frequency.
112    results = []
113    for index, value in peaks:
114        results.append((x_f[index], value))
115    return results
116
117
118def _rfft_freq(length, rate):
119    """Gets the frequency at each index of real FFT.
120
121    @param length: The window length of FFT.
122    @param rate: Sampling rate.
123
124    @returns: A numpy array containing frequency corresponding to
125              numpy.fft.rfft result at each index.
126
127    """
128    # The difference in Hz between each index.
129    val = rate / float(length)
130    # Only care half of frequencies for FFT on real signal.
131    result_length = length // 2 + 1
132    return numpy.linspace(0, (result_length - 1) * val, result_length)
133
134
135def peak_detection(array, window_size):
136    """Detects peaks in an array.
137
138    A point (i, array[i]) is a peak if array[i] is the maximum among
139    array[i - half_window_size] to array[i + half_window_size].
140    If array[i - half_window_size] to array[i + half_window_size] are all equal,
141    then there is no peak in this window.
142    Note that we only consider peak with value greater than 0.
143
144    @param window_size: The window to detect peaks.
145
146    @returns: A list of tuples:
147              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
148              where the tuples are sorted by peak values.
149
150    """
151    half_window_size = window_size / 2
152    length = len(array)
153
154    def mid_is_peak(array, mid, left, right):
155        """Checks if value at mid is the largest among left to right in array.
156
157        @param array: A list of numbers.
158        @param mid: The mid index.
159        @param left: The left index.
160        @param rigth: The right index.
161
162        @returns: A tuple (is_peak, next_candidate)
163                  is_peak is True if array[index] is the maximum among numbers
164                  in array between index [left, right] inclusively.
165                  next_candidate is the index of next candidate for peak if
166                  is_peak is False. It is the index of maximum value in
167                  [mid + 1, right]. If is_peak is True, next_candidate is
168                  right + 1.
169
170        """
171        value_mid = array[mid]
172        is_peak = True
173        next_peak_candidate_index = None
174
175        # Check the left half window.
176        for index in xrange(left, mid):
177            if array[index] >= value_mid:
178                is_peak = False
179                break
180
181        # Mid is at the end of array.
182        if mid == right:
183            return is_peak, right + 1
184
185        # Check the right half window and also record next candidate.
186        # Favor the larger index for next_peak_candidate_index.
187        for index in xrange(right, mid, -1):
188            if (next_peak_candidate_index is None or
189                array[index] > array[next_peak_candidate_index]):
190                next_peak_candidate_index = index
191
192        if array[next_peak_candidate_index] >= value_mid:
193            is_peak = False
194
195        if is_peak:
196            next_peak_candidate_index = right + 1
197
198        return is_peak, next_peak_candidate_index
199
200    results = []
201    mid = 0
202    next_candidate_idx = None
203    while mid < length:
204        left = max(0, mid - half_window_size)
205        right = min(length - 1, mid + half_window_size)
206
207        # Only consider value greater than 0.
208        if array[mid] == 0:
209            mid = mid + 1
210            continue;
211
212        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
213
214        if is_peak:
215            results.append((mid, array[mid]))
216
217        # Use the next candidate found in [mid + 1, right], or right + 1.
218        mid = next_candidate_idx
219
220    # Sort the peaks by values.
221    return sorted(results, key=lambda x: x[1], reverse=True)
222
223
224# The default pattern mathing threshold. By experiment, this threshold
225# can tolerate normal noise of 0.3 amplitude when sine wave signal
226# amplitude is 1.
227PATTERN_MATCHING_THRESHOLD = 0.85
228
229# The default block size of pattern matching.
230ANOMALY_DETECTION_BLOCK_SIZE = 120
231
232def anomaly_detection(signal, rate, freq,
233                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
234                      threshold=PATTERN_MATCHING_THRESHOLD):
235    """Detects anomaly in a sine wave signal.
236
237    This method detects anomaly in a sine wave signal by matching
238    patterns of each block.
239    For each moving window of block in the test signal, checks if there
240    is any block in golden signal that is similar to this block of test signal.
241    If there is such a block in golden signal, then this block of test
242    signal is matched and there is no anomaly in this block of test signal.
243    If there is any block in test signal that is not matched, then this block
244    covers an anomaly.
245    The block of test signal starts from index 0, and proceeds in steps of
246    half block size. The overlapping of test signal blocks makes sure there must
247    be at least one block covering the transition from sine wave to anomaly.
248
249    @param signal: A 1-D array-like object for 1-channel PCM data.
250    @param rate: The sampling rate.
251    @param freq: The expected frequency of signal.
252    @param block_size: The block size in samples to detect anomaly.
253    @param threshold: The threshold of correlation index to be judge as matched.
254
255    @returns: A list containing detected anomaly time in seconds.
256
257    """
258    if len(signal) == 0:
259        raise EmptyDataError('Signal data is empty')
260
261    golden_y = _generate_golden_pattern(rate, freq, block_size)
262
263    results = []
264
265    for start in xrange(0, len(signal), block_size / 2):
266        end = start + block_size
267        test_signal = signal[start:end]
268        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
269        if not matched:
270            results.append(start)
271
272    results = [float(x) / rate for x in results]
273
274    return results
275
276
277def _generate_golden_pattern(rate, freq, block_size):
278    """Generates a golden pattern of certain frequency.
279
280    The golden pattern must cover all the possibilities of waveforms in a
281    block. So, we need a golden pattern covering 1 period + 1 block size,
282    such that the test block can start anywhere in a period, and extends
283    a block size.
284
285    |period |1 bk|
286    |       |    |
287     . .     . .
288    .   .   .   .
289         . .     .
290
291    @param rate: The sampling rate.
292    @param freq: The frequency of golden pattern.
293    @param block_size: The block size in samples to detect anomaly.
294
295    @returns: A 1-D array for golden pattern.
296
297    """
298    samples_in_a_period = int(rate / freq) + 1
299    samples_in_golden_pattern = samples_in_a_period + block_size
300    golden_x = numpy.linspace(
301            0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
302            samples_in_golden_pattern)
303    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
304    return golden_y
305
306
307def _moving_pattern_matching(golden_signal, test_signal, threshold):
308    """Checks if test_signal is similar to any block of golden_signal.
309
310    Compares test signal with each block of golden signal by correlation
311    index. If there is any block of golden signal that is similar to
312    test signal, then it is matched.
313
314    @param golden_signal: A 1-D array for golden signal.
315    @param test_signal: A 1-D array for test signal.
316    @param threshold: The threshold of correlation index to be judge as matched.
317
318    @returns: True if there is a match. False otherwise.
319
320    @raises: ValueError: if test signal is longer than golden signal.
321
322    """
323    if len(golden_signal) < len(test_signal):
324        raise ValueError('Test signal is longer than golden signal')
325
326    block_length = len(test_signal)
327    number_of_movings = len(golden_signal) - block_length + 1
328    correlation_indices = []
329    for moving_index in xrange(number_of_movings):
330        # Cuts one block of golden signal from start index.
331        # The block length is the same as test signal.
332        start = moving_index
333        end = start + block_length
334        golden_signal_block = golden_signal[start:end]
335        try:
336            correlation_index = _get_correlation_index(
337                    golden_signal_block, test_signal)
338        except TestSignalNormTooSmallError:
339            logging.info('Caught one block of test signal that has no meaningful norm')
340            return False
341        correlation_indices.append(correlation_index)
342
343    # Checks if the maximum correlation index is high enough.
344    max_corr = max(correlation_indices)
345    if max_corr < threshold:
346        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
347        return False
348    return True
349
350
351class GoldenSignalNormTooSmallError(Exception):
352    """Exception when golden signal norm is too small."""
353    pass
354
355
356class TestSignalNormTooSmallError(Exception):
357    """Exception when test signal norm is too small."""
358    pass
359
360
361_MINIMUM_SIGNAL_NORM = 0.001
362
363def _get_correlation_index(golden_signal, test_signal):
364    """Computes correlation index of two signal of same length.
365
366    @param golden_signal: An 1-D array-like object.
367    @param test_signal: An 1-D array-like object.
368
369    @raises: ValueError: if two signal have different lengths.
370    @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
371    @raises: TestSignalNormTooSmallError: if test signal norm is too small.
372
373    @returns: The correlation index.
374    """
375    if len(golden_signal) != len(test_signal):
376        raise ValueError(
377                'Only accepts signal of same length: %s, %s' % (
378                        len(golden_signal), len(test_signal)))
379
380    norm_golden = numpy.linalg.norm(golden_signal)
381    norm_test = numpy.linalg.norm(test_signal)
382    if norm_golden <= _MINIMUM_SIGNAL_NORM:
383        raise GoldenSignalNormTooSmallError(
384                'No meaningful data as norm is too small.')
385    if norm_test <= _MINIMUM_SIGNAL_NORM:
386        raise TestSignalNormTooSmallError(
387                'No meaningful data as norm is too small.')
388
389    # The 'valid' cross correlation result of two signals of same length will
390    # contain only one number.
391    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
392    return correlation / (norm_golden * norm_test)
393