1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""This module provides utilities to do audio data analysis."""
17
18import logging
19import numpy
20import operator
21
22# The default block size of pattern matching.
23ANOMALY_DETECTION_BLOCK_SIZE = 120
24
25# Only peaks with coefficient greater than 0.01 of the first peak should be
26# considered. Note that this correspond to -40dB in the spectrum.
27DEFAULT_MIN_PEAK_RATIO = 0.01
28
29# The minimum RMS value of meaningful audio data.
30MEANINGFUL_RMS_THRESHOLD = 0.001
31
32# The minimal signal norm value.
33_MINIMUM_SIGNAL_NORM = 0.001
34
35# The default pattern mathing threshold. By experiment, this threshold
36# can tolerate normal noise of 0.3 amplitude when sine wave signal
37# amplitude is 1.
38PATTERN_MATCHING_THRESHOLD = 0.85
39
40# Window size for peak detection.
41PEAK_WINDOW_SIZE_HZ = 20
42
43
44class RMSTooSmallError(Exception):
45    """Error when signal RMS is too small."""
46    pass
47
48
49class EmptyDataError(Exception):
50    """Error when signal is empty."""
51    pass
52
53
54def normalize_signal(signal, saturate_value):
55    """Normalizes the signal with respect to the saturate value.
56
57    Args:
58        signal: A list for one-channel PCM data.
59        saturate_value: The maximum value that the PCM data might be.
60
61    Returns:
62        A numpy array containing normalized signal. The normalized signal has
63            value -1 and 1 when it saturates.
64
65    """
66    signal = numpy.array(signal)
67    return signal / float(saturate_value)
68
69
70def spectral_analysis(signal,
71                      rate,
72                      min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
73                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
74    """Gets the dominant frequencies by spectral analysis.
75
76    Args:
77        signal: A list of numbers for one-channel PCM data. This should be
78                   normalized to [-1, 1] so the function can check if signal RMS
79                   is too small to be meaningful.
80        rate: Sampling rate in samples per second. Example inputs: 44100,
81        48000
82        min_peak_ratio: The minimum peak_i/peak_0 ratio such that the
83                           peaks other than the greatest one should be
84                           considered.
85                           This is to ignore peaks that are too small compared
86                           to the first peak peak_0.
87        peak_window_size_hz: The window size in Hz to find the peaks.
88                                The minimum differences between found peaks will
89                                be half of this value.
90
91    Returns:
92        A list of tuples:
93              [(peak_frequency_0, peak_coefficient_0),
94               (peak_frequency_1, peak_coefficient_1),
95               (peak_frequency_2, peak_coefficient_2), ...]
96              where the tuples are sorted by coefficients. The last
97              peak_coefficient will be no less than peak_coefficient *
98              min_peak_ratio. If RMS is less than MEANINGFUL_RMS_THRESHOLD,
99              returns [(0, 0)].
100
101    """
102    # Checks the signal is meaningful.
103    if len(signal) == 0:
104        raise EmptyDataError('Signal data is empty')
105
106    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
107    logging.debug('signal RMS = %s', signal_rms)
108
109    # If RMS is too small, set dominant frequency and coefficient to 0.
110    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
111        logging.warning(
112            'RMS %s is too small to be meaningful. Set frequency to 0.',
113            signal_rms)
114        return [(0, 0)]
115
116    logging.debug('Doing spectral analysis ...')
117
118    # First, pass signal through a window function to mitigate spectral leakage.
119    y_conv_w = signal * numpy.hanning(len(signal))
120
121    length = len(y_conv_w)
122
123    # x_f is the frequency in Hz, y_f is the transformed coefficient.
124    x_f = _rfft_freq(length, rate)
125    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
126
127    # y_f is complex so consider its absolute value for magnitude.
128    abs_y_f = numpy.abs(y_f)
129    threshold = max(abs_y_f) * min_peak_ratio
130
131    # Suppresses all coefficients that are below threshold.
132    for i in range(len(abs_y_f)):
133        if abs_y_f[i] < threshold:
134            abs_y_f[i] = 0
135
136    # Gets the peak detection window size in indice.
137    # x_f[1] is the frequency difference per index.
138    peak_window_size = int(peak_window_size_hz / x_f[1])
139
140    # Detects peaks.
141    peaks = peak_detection(abs_y_f, peak_window_size)
142
143    # Transform back the peak location from index to frequency.
144    results = []
145    for index, value in peaks:
146        results.append((x_f[int(index)], value))
147    return results
148
149
150def _rfft_freq(length, rate):
151    """Gets the frequency at each index of real FFT.
152
153    Args:
154        length: The window length of FFT.
155        rate: Sampling rate in samples per second. Example inputs: 44100,
156        48000
157
158    Returns:
159        A numpy array containing frequency corresponding to numpy.fft.rfft
160            result at each index.
161
162    """
163    # The difference in Hz between each index.
164    val = rate / float(length)
165    # Only care half of frequencies for FFT on real signal.
166    result_length = length // 2 + 1
167    return numpy.linspace(0, (result_length - 1) * val, result_length)
168
169
170def peak_detection(array, window_size):
171    """Detects peaks in an array.
172
173    A point (i, array[i]) is a peak if array[i] is the maximum among
174    array[i - half_window_size] to array[i + half_window_size].
175    If array[i - half_window_size] to array[i + half_window_size] are all equal,
176    then there is no peak in this window.
177    Note that we only consider peak with value greater than 0.
178
179    Args:
180        array: The input array to detect peaks in. Array is a list of
181        absolute values of the magnitude of transformed coefficient.
182
183        window_size: The window to detect peaks.
184
185    Returns:
186        A list of tuples:
187              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
188              where the tuples are sorted by peak values.
189
190    """
191    half_window_size = window_size / 2
192    length = len(array)
193
194    def mid_is_peak(array, mid, left, right):
195        """Checks if value at mid is the largest among left to right in array.
196
197        Args:
198            array: A list of numbers.
199            mid: The mid index.
200            left: The left index.
201            rigth: The right index.
202
203        Returns:
204            A tuple (is_peak, next_candidate)
205                  is_peak is True if array[index] is the maximum among numbers
206                  in array between index [left, right] inclusively.
207                  next_candidate is the index of next candidate for peak if
208                  is_peak is False. It is the index of maximum value in
209                  [mid + 1, right]. If is_peak is True, next_candidate is
210                  right + 1.
211
212        """
213        value_mid = array[int(mid)]
214        is_peak = True
215        next_peak_candidate_index = None
216
217        # Check the left half window.
218        for index in range(int(left), int(mid)):
219            if array[index] >= value_mid:
220                is_peak = False
221                break
222
223        # Mid is at the end of array.
224        if mid == right:
225            return is_peak, right + 1
226
227        # Check the right half window and also record next candidate.
228        # Favor the larger index for next_peak_candidate_index.
229        for index in range(int(right), int(mid), -1):
230            if (next_peak_candidate_index is None or
231                    array[index] > array[next_peak_candidate_index]):
232                next_peak_candidate_index = index
233
234        if array[next_peak_candidate_index] >= value_mid:
235            is_peak = False
236
237        if is_peak:
238            next_peak_candidate_index = right + 1
239
240        return is_peak, next_peak_candidate_index
241
242    results = []
243    mid = 0
244    next_candidate_idx = None
245    while mid < length:
246        left = max(0, mid - half_window_size)
247        right = min(length - 1, mid + half_window_size)
248
249        # Only consider value greater than 0.
250        if array[int(mid)] == 0:
251            mid = mid + 1
252            continue
253
254        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
255
256        if is_peak:
257            results.append((mid, array[int(mid)]))
258
259        # Use the next candidate found in [mid + 1, right], or right + 1.
260        mid = next_candidate_idx
261
262    # Sort the peaks by values.
263    return sorted(results, key=lambda x: x[1], reverse=True)
264
265
266def anomaly_detection(signal,
267                      rate,
268                      freq,
269                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
270                      threshold=PATTERN_MATCHING_THRESHOLD):
271    """Detects anomaly in a sine wave signal.
272
273    This method detects anomaly in a sine wave signal by matching
274    patterns of each block.
275    For each moving window of block in the test signal, checks if there
276    is any block in golden signal that is similar to this block of test signal.
277    If there is such a block in golden signal, then this block of test
278    signal is matched and there is no anomaly in this block of test signal.
279    If there is any block in test signal that is not matched, then this block
280    covers an anomaly.
281    The block of test signal starts from index 0, and proceeds in steps of
282    half block size. The overlapping of test signal blocks makes sure there must
283    be at least one block covering the transition from sine wave to anomaly.
284
285    Args:
286        signal: A 1-D array-like object for 1-channel PCM data.
287        rate: Sampling rate in samples per second. Example inputs: 44100,
288        48000
289        freq: The expected frequency of signal.
290        block_size: The block size in samples to detect anomaly.
291        threshold: The threshold of correlation index to be judge as matched.
292
293    Returns:
294        A list containing detected anomaly time in seconds.
295
296    """
297    if len(signal) == 0:
298        raise EmptyDataError('Signal data is empty')
299
300    golden_y = _generate_golden_pattern(rate, freq, block_size)
301
302    results = []
303
304    for start in range(0, len(signal), int(block_size / 2)):
305        end = start + block_size
306        test_signal = signal[start:end]
307        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
308        if not matched:
309            results.append(start)
310
311    results = [float(x) / rate for x in results]
312
313    return results
314
315
316def _generate_golden_pattern(rate, freq, block_size):
317    """Generates a golden pattern of certain frequency.
318
319    The golden pattern must cover all the possibilities of waveforms in a
320    block. So, we need a golden pattern covering 1 period + 1 block size,
321    such that the test block can start anywhere in a period, and extends
322    a block size.
323
324    |period |1 bk|
325    |       |    |
326     . .     . .
327    .   .   .   .
328         . .     .
329
330    Args:
331        rate: Sampling rate in samples per second. Example inputs: 44100,
332        48000
333        freq: The frequency of golden pattern.
334        block_size: The block size in samples to detect anomaly.
335
336    Returns:
337        A 1-D array for golden pattern.
338
339    """
340    samples_in_a_period = int(rate / freq) + 1
341    samples_in_golden_pattern = samples_in_a_period + block_size
342    golden_x = numpy.linspace(0.0, (samples_in_golden_pattern - 1) * 1.0 /
343                              rate, samples_in_golden_pattern)
344    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
345    return golden_y
346
347
348def _moving_pattern_matching(golden_signal, test_signal, threshold):
349    """Checks if test_signal is similar to any block of golden_signal.
350
351    Compares test signal with each block of golden signal by correlation
352    index. If there is any block of golden signal that is similar to
353    test signal, then it is matched.
354
355    Args:
356        golden_signal: A 1-D array for golden signal.
357        test_signal: A 1-D array for test signal.
358        threshold: The threshold of correlation index to be judge as matched.
359
360    Returns:
361        True if there is a match. False otherwise.
362
363        ValueError: if test signal is longer than golden signal.
364
365    """
366    if len(golden_signal) < len(test_signal):
367        raise ValueError('Test signal is longer than golden signal')
368
369    block_length = len(test_signal)
370    number_of_movings = len(golden_signal) - block_length + 1
371    correlation_indices = []
372    for moving_index in range(number_of_movings):
373        # Cuts one block of golden signal from start index.
374        # The block length is the same as test signal.
375        start = moving_index
376        end = start + block_length
377        golden_signal_block = golden_signal[start:end]
378        try:
379            correlation_index = _get_correlation_index(golden_signal_block,
380                                                       test_signal)
381        except TestSignalNormTooSmallError:
382            logging.info(
383                'Caught one block of test signal that has no meaningful norm')
384            return False
385        correlation_indices.append(correlation_index)
386
387    # Checks if the maximum correlation index is high enough.
388    max_corr = max(correlation_indices)
389    if max_corr < threshold:
390        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
391        return False
392    return True
393
394
395class GoldenSignalNormTooSmallError(Exception):
396    """Exception when golden signal norm is too small."""
397    pass
398
399
400class TestSignalNormTooSmallError(Exception):
401    """Exception when test signal norm is too small."""
402    pass
403
404
405def _get_correlation_index(golden_signal, test_signal):
406    """Computes correlation index of two signal of same length.
407
408    Args:
409        golden_signal: An 1-D array-like object.
410        test_signal: An 1-D array-like object.
411
412    Raises:
413        ValueError: if two signal have different lengths.
414        GoldenSignalNormTooSmallError: if golden signal norm is too small
415        TestSignalNormTooSmallError: if test signal norm is too small.
416
417    Returns:
418        The correlation index.
419    """
420    if len(golden_signal) != len(test_signal):
421        raise ValueError('Only accepts signal of same length: %s, %s' %
422                         (len(golden_signal), len(test_signal)))
423
424    norm_golden = numpy.linalg.norm(golden_signal)
425    norm_test = numpy.linalg.norm(test_signal)
426    if norm_golden <= _MINIMUM_SIGNAL_NORM:
427        raise GoldenSignalNormTooSmallError(
428            'No meaningful data as norm is too small.')
429    if norm_test <= _MINIMUM_SIGNAL_NORM:
430        raise TestSignalNormTooSmallError(
431            'No meaningful data as norm is too small.')
432
433    # The 'valid' cross correlation result of two signals of same length will
434    # contain only one number.
435    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
436    return correlation / (norm_golden * norm_test)
437