1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
2#
3# Use of this source code is governed by a BSD-style license
4# that can be found in the LICENSE file in the root of the source
5# tree. An additional intellectual property rights grant can be found
6# in the file PATENTS.  All contributing project authors may
7# be found in the AUTHORS file in the root of the source tree.
8
9"""Signal processing utility module.
10"""
11
12import array
13import logging
14import os
15import sys
16import enum
17
18try:
19  import numpy as np
20except ImportError:
21  logging.critical('Cannot import the third-party Python package numpy')
22  sys.exit(1)
23
24try:
25  import pydub
26  import pydub.generators
27except ImportError:
28  logging.critical('Cannot import the third-party Python package pydub')
29  sys.exit(1)
30
31try:
32  import scipy.signal
33  import scipy.fftpack
34except ImportError:
35  logging.critical('Cannot import the third-party Python package scipy')
36  sys.exit(1)
37
38from . import exceptions
39
40
41class SignalProcessingUtils(object):
42  """Collection of signal processing utilities.
43  """
44
45  @enum.unique
46  class MixPadding(enum.Enum):
47    NO_PADDING = 0
48    ZERO_PADDING = 1
49    LOOP = 2
50
51  def __init__(self):
52    pass
53
54  @classmethod
55  def LoadWav(cls, filepath, channels=1):
56    """Loads wav file.
57
58    Args:
59      filepath: path to the wav audio track file to load.
60      channels: number of channels (downmixing to mono by default).
61
62    Returns:
63      AudioSegment instance.
64    """
65    if not os.path.exists(filepath):
66      logging.error('cannot find the <%s> audio track file', filepath)
67      raise exceptions.FileNotFoundError()
68    return pydub.AudioSegment.from_file(
69        filepath, format='wav', channels=channels)
70
71  @classmethod
72  def SaveWav(cls, output_filepath, signal):
73    """Saves wav file.
74
75    Args:
76      output_filepath: path to the wav audio track file to save.
77      signal: AudioSegment instance.
78    """
79    return signal.export(output_filepath, format='wav')
80
81  @classmethod
82  def CountSamples(cls, signal):
83    """Number of samples per channel.
84
85    Args:
86      signal: AudioSegment instance.
87
88    Returns:
89      An integer.
90    """
91    number_of_samples = len(signal.get_array_of_samples())
92    assert signal.channels > 0
93    assert number_of_samples % signal.channels == 0
94    return number_of_samples / signal.channels
95
96  @classmethod
97  def GenerateSilence(cls, duration=1000, sample_rate=48000):
98    """Generates silence.
99
100    This method can also be used to create a template AudioSegment instance.
101    A template can then be used with other Generate*() methods accepting an
102    AudioSegment instance as argument.
103
104    Args:
105      duration: duration in ms.
106      sample_rate: sample rate.
107
108    Returns:
109      AudioSegment instance.
110    """
111    return pydub.AudioSegment.silent(duration, sample_rate)
112
113  @classmethod
114  def GeneratePureTone(cls, template, frequency=440.0):
115    """Generates a pure tone.
116
117    The pure tone is generated with the same duration and in the same format of
118    the given template signal.
119
120    Args:
121      template: AudioSegment instance.
122      frequency: Frequency of the pure tone in Hz.
123
124    Return:
125      AudioSegment instance.
126    """
127    if frequency > template.frame_rate >> 1:
128      raise exceptions.SignalProcessingException('Invalid frequency')
129
130    generator = pydub.generators.Sine(
131        sample_rate=template.frame_rate,
132        bit_depth=template.sample_width * 8,
133        freq=frequency)
134
135    return generator.to_audio_segment(
136        duration=len(template),
137        volume=0.0)
138
139  @classmethod
140  def GenerateWhiteNoise(cls, template):
141    """Generates white noise.
142
143    The white noise is generated with the same duration and in the same format
144    of the given template signal.
145
146    Args:
147      template: AudioSegment instance.
148
149    Return:
150      AudioSegment instance.
151    """
152    generator = pydub.generators.WhiteNoise(
153        sample_rate=template.frame_rate,
154        bit_depth=template.sample_width * 8)
155    return generator.to_audio_segment(
156        duration=len(template),
157        volume=0.0)
158
159  @classmethod
160  def AudioSegmentToRawData(cls, signal):
161    samples = signal.get_array_of_samples()
162    if samples.typecode != 'h':
163      raise exceptions.SignalProcessingException('Unsupported samples type')
164    return np.array(signal.get_array_of_samples(), np.int16)
165
166  @classmethod
167  def Fft(cls, signal, normalize=True):
168    if signal.channels != 1:
169      raise NotImplementedError('multiple-channel FFT not implemented')
170    x = cls.AudioSegmentToRawData(signal).astype(np.float32)
171    if normalize:
172      x /= max(abs(np.max(x)), 1.0)
173    y = scipy.fftpack.fft(x)
174    return y[:len(y) / 2]
175
176  @classmethod
177  def DetectHardClipping(cls, signal, threshold=2):
178    """Detects hard clipping.
179
180    Hard clipping is simply detected by counting samples that touch either the
181    lower or upper bound too many times in a row (according to |threshold|).
182    The presence of a single sequence of samples meeting such property is enough
183    to label the signal as hard clipped.
184
185    Args:
186      signal: AudioSegment instance.
187      threshold: minimum number of samples at full-scale in a row.
188
189    Returns:
190      True if hard clipping is detect, False otherwise.
191    """
192    if signal.channels != 1:
193      raise NotImplementedError('multiple-channel clipping not implemented')
194    if signal.sample_width != 2:  # Note that signal.sample_width is in bytes.
195      raise exceptions.SignalProcessingException(
196          'hard-clipping detection only supported for 16 bit samples')
197    samples = cls.AudioSegmentToRawData(signal)
198
199    # Detect adjacent clipped samples.
200    samples_type_info = np.iinfo(samples.dtype)
201    mask_min = samples == samples_type_info.min
202    mask_max = samples == samples_type_info.max
203
204    def HasLongSequence(vector, min_legth=threshold):
205      """Returns True if there are one or more long sequences of True flags."""
206      seq_length = 0
207      for b in vector:
208        seq_length = seq_length + 1 if b else 0
209        if seq_length >= min_legth:
210          return True
211      return False
212
213    return HasLongSequence(mask_min) or HasLongSequence(mask_max)
214
215  @classmethod
216  def ApplyImpulseResponse(cls, signal, impulse_response):
217    """Applies an impulse response to a signal.
218
219    Args:
220      signal: AudioSegment instance.
221      impulse_response: list or numpy vector of float values.
222
223    Returns:
224      AudioSegment instance.
225    """
226    # Get samples.
227    assert signal.channels == 1, (
228        'multiple-channel recordings not supported')
229    samples = signal.get_array_of_samples()
230
231    # Convolve.
232    logging.info('applying %d order impulse response to a signal lasting %d ms',
233                 len(impulse_response), len(signal))
234    convolved_samples = scipy.signal.fftconvolve(
235        in1=samples,
236        in2=impulse_response,
237        mode='full').astype(np.int16)
238    logging.info('convolution computed')
239
240    # Cast.
241    convolved_samples = array.array(signal.array_type, convolved_samples)
242
243    # Verify.
244    logging.debug('signal length: %d samples', len(samples))
245    logging.debug('convolved signal length: %d samples', len(convolved_samples))
246    assert len(convolved_samples) > len(samples)
247
248    # Generate convolved signal AudioSegment instance.
249    convolved_signal = pydub.AudioSegment(
250        data=convolved_samples,
251        metadata={
252            'sample_width': signal.sample_width,
253            'frame_rate': signal.frame_rate,
254            'frame_width': signal.frame_width,
255            'channels': signal.channels,
256        })
257    assert len(convolved_signal) > len(signal)
258
259    return convolved_signal
260
261  @classmethod
262  def Normalize(cls, signal):
263    """Normalizes a signal.
264
265    Args:
266      signal: AudioSegment instance.
267
268    Returns:
269      An AudioSegment instance.
270    """
271    return signal.apply_gain(-signal.max_dBFS)
272
273  @classmethod
274  def Copy(cls, signal):
275    """Makes a copy os a signal.
276
277    Args:
278      signal: AudioSegment instance.
279
280    Returns:
281      An AudioSegment instance.
282    """
283    return pydub.AudioSegment(
284        data=signal.get_array_of_samples(),
285        metadata={
286            'sample_width': signal.sample_width,
287            'frame_rate': signal.frame_rate,
288            'frame_width': signal.frame_width,
289            'channels': signal.channels,
290        })
291
292  @classmethod
293  def MixSignals(cls, signal, noise, target_snr=0.0,
294                 pad_noise=MixPadding.NO_PADDING):
295    """Mixes |signal| and |noise| with a target SNR.
296
297    Mix |signal| and |noise| with a desired SNR by scaling |noise|.
298    If the target SNR is +/- infinite, a copy of signal/noise is returned.
299    If |signal| is shorter than |noise|, the length of the mix equals that of
300    |signal|. Otherwise, the mix length depends on whether padding is applied.
301    When padding is not applied, that is |pad_noise| is set to NO_PADDING
302    (default), the mix length equals that of |noise| - i.e., |signal| is
303    truncated. Otherwise, |noise| is extended and the resulting mix has the same
304    length of |signal|.
305
306    Args:
307      signal: AudioSegment instance (signal).
308      noise: AudioSegment instance (noise).
309      target_snr: float, numpy.Inf or -numpy.Inf (dB).
310      pad_noise: SignalProcessingUtils.MixPadding, default: NO_PADDING.
311
312    Returns:
313      An AudioSegment instance.
314    """
315    # Handle infinite target SNR.
316    if target_snr == -np.Inf:
317      # Return a copy of noise.
318      logging.warning('SNR = -Inf, returning noise')
319      return cls.Copy(noise)
320    elif target_snr == np.Inf:
321      # Return a copy of signal.
322      logging.warning('SNR = +Inf, returning signal')
323      return cls.Copy(signal)
324
325    # Check signal and noise power.
326    signal_power = float(signal.dBFS)
327    noise_power = float(noise.dBFS)
328    if signal_power == -np.Inf:
329      logging.error('signal has -Inf power, cannot mix')
330      raise exceptions.SignalProcessingException(
331          'cannot mix a signal with -Inf power')
332    if noise_power == -np.Inf:
333      logging.error('noise has -Inf power, cannot mix')
334      raise exceptions.SignalProcessingException(
335          'cannot mix a signal with -Inf power')
336
337    # Mix.
338    gain_db = signal_power - noise_power - target_snr
339    signal_duration = len(signal)
340    noise_duration = len(noise)
341    if signal_duration <= noise_duration:
342      # Ignore |pad_noise|, |noise| is truncated if longer that |signal|, the
343      # mix will have the same length of |signal|.
344      return signal.overlay(noise.apply_gain(gain_db))
345    elif pad_noise == cls.MixPadding.NO_PADDING:
346      # |signal| is longer than |noise|, but no padding is applied to |noise|.
347      # Truncate |signal|.
348      return noise.overlay(signal, gain_during_overlay=gain_db)
349    elif pad_noise == cls.MixPadding.ZERO_PADDING:
350      # TODO(alessiob): Check that this works as expected.
351      return signal.overlay(noise.apply_gain(gain_db))
352    elif pad_noise == cls.MixPadding.LOOP:
353      # |signal| is longer than |noise|, extend |noise| by looping.
354      return signal.overlay(noise.apply_gain(gain_db), loop=True)
355    else:
356      raise exceptions.SignalProcessingException('invalid padding type')
357