1# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
2#
3# Use of this source code is governed by a BSD-style license
4# that can be found in the LICENSE file in the root of the source
5# tree. An additional intellectual property rights grant can be found
6# in the file PATENTS.  All contributing project authors may
7# be found in the AUTHORS file in the root of the source tree.
8
9"""Test data generators producing signals pairs intended to be used to
10test the APM module. Each pair consists of a noisy input and a reference signal.
11The former is used as APM input and it is generated by adding noise to a
12clean audio track. The reference is the expected APM output.
13
14Throughout this file, the following naming convention is used:
15  - input signal: the clean signal (e.g., speech),
16  - noise signal: the noise to be summed up to the input signal (e.g., white
17    noise, Gaussian noise),
18  - noisy signal: input + noise.
19The noise signal may or may not be a function of the clean signal. For
20instance, white noise is independently generated, whereas reverberation is
21obtained by convolving the input signal with an impulse response.
22"""
23
24import logging
25import os
26import shutil
27import sys
28
29try:
30  import scipy.io
31except ImportError:
32  logging.critical('Cannot import the third-party Python package scipy')
33  sys.exit(1)
34
35from . import data_access
36from . import exceptions
37from . import signal_processing
38
39
40class TestDataGenerator(object):
41  """Abstract class responsible for the generation of noisy signals.
42
43  Given a clean signal, it generates two streams named noisy signal and
44  reference. The former is the clean signal deteriorated by the noise source,
45  the latter goes through the same deterioration process, but more "gently".
46  Noisy signal and reference are produced so that the reference is the signal
47  expected at the output of the APM module when the latter is fed with the noisy
48  signal.
49
50  An test data generator generates one or more pairs.
51  """
52
53  NAME = None
54  REGISTERED_CLASSES = {}
55
56  def __init__(self, output_directory_prefix):
57    self._output_directory_prefix = output_directory_prefix
58    # Init dictionaries with one entry for each test data generator
59    # configuration (e.g., different SNRs).
60    # Noisy audio track files (stored separately in a cache folder).
61    self._noisy_signal_filepaths = None
62    # Path to be used for the APM simulation output files.
63    self._apm_output_paths = None
64    # Reference audio track files (stored separately in a cache folder).
65    self._reference_signal_filepaths = None
66    self.Clear()
67
68  @classmethod
69  def RegisterClass(cls, class_to_register):
70    """Registers a TestDataGenerator implementation.
71
72    Decorator to automatically register the classes that extend
73    TestDataGenerator.
74    Example usage:
75
76    @TestDataGenerator.RegisterClass
77    class IdentityGenerator(TestDataGenerator):
78      pass
79    """
80    cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
81    return class_to_register
82
83  @property
84  def config_names(self):
85    return self._noisy_signal_filepaths.keys()
86
87  @property
88  def noisy_signal_filepaths(self):
89    return self._noisy_signal_filepaths
90
91  @property
92  def apm_output_paths(self):
93    return self._apm_output_paths
94
95  @property
96  def reference_signal_filepaths(self):
97    return self._reference_signal_filepaths
98
99  def Generate(
100      self, input_signal_filepath, test_data_cache_path, base_output_path):
101    """Generates a set of noisy input and reference audiotrack file pairs.
102
103    This method initializes an empty set of pairs and calls the _Generate()
104    method implemented in a concrete class.
105
106    Args:
107      input_signal_filepath: path to the clean input audio track file.
108      test_data_cache_path: path to the cache of the generated audio track
109                            files.
110      base_output_path: base path where output is written.
111    """
112    self.Clear()
113    self._Generate(
114        input_signal_filepath, test_data_cache_path, base_output_path)
115
116  def Clear(self):
117    """Clears the generated output path dictionaries.
118    """
119    self._noisy_signal_filepaths = {}
120    self._apm_output_paths = {}
121    self._reference_signal_filepaths = {}
122
123  def _Generate(
124      self, input_signal_filepath, test_data_cache_path, base_output_path):
125    """Abstract method to be implemented in each concrete class.
126    """
127    raise NotImplementedError()
128
129  def _AddNoiseSnrPairs(self, base_output_path, noisy_mix_filepaths,
130                        snr_value_pairs):
131    """Adds noisy-reference signal pairs.
132
133    Args:
134      base_output_path: noisy tracks base output path.
135      noisy_mix_filepaths: nested dictionary of noisy signal paths organized
136                           by noisy track name and SNR level.
137      snr_value_pairs: list of SNR pairs.
138    """
139    for noise_track_name in noisy_mix_filepaths:
140      for snr_noisy, snr_refence in snr_value_pairs:
141        config_name = '{0}_{1:d}_{2:d}_SNR'.format(
142            noise_track_name, snr_noisy, snr_refence)
143        output_path = self._MakeDir(base_output_path, config_name)
144        self._AddNoiseReferenceFilesPair(
145            config_name=config_name,
146            noisy_signal_filepath=noisy_mix_filepaths[
147                noise_track_name][snr_noisy],
148            reference_signal_filepath=noisy_mix_filepaths[
149                noise_track_name][snr_refence],
150            output_path=output_path)
151
152  def _AddNoiseReferenceFilesPair(self, config_name, noisy_signal_filepath,
153                                  reference_signal_filepath, output_path):
154    """Adds one noisy-reference signal pair.
155
156    Args:
157      config_name: name of the APM configuration.
158      noisy_signal_filepath: path to noisy audio track file.
159      reference_signal_filepath: path to reference audio track file.
160      output_path: APM output path.
161    """
162    assert config_name not in self._noisy_signal_filepaths
163    self._noisy_signal_filepaths[config_name] = os.path.abspath(
164        noisy_signal_filepath)
165    self._apm_output_paths[config_name] = os.path.abspath(output_path)
166    self._reference_signal_filepaths[config_name] = os.path.abspath(
167        reference_signal_filepath)
168
169  def _MakeDir(self, base_output_path, test_data_generator_config_name):
170    output_path = os.path.join(
171        base_output_path,
172        self._output_directory_prefix + test_data_generator_config_name)
173    data_access.MakeDirectory(output_path)
174    return output_path
175
176
177@TestDataGenerator.RegisterClass
178class IdentityTestDataGenerator(TestDataGenerator):
179  """Generator that adds no noise.
180
181  Both the noisy and the reference signals are the input signal.
182  """
183
184  NAME = 'identity'
185
186  def __init__(self, output_directory_prefix, copy_with_identity):
187    TestDataGenerator.__init__(self, output_directory_prefix)
188    self._copy_with_identity = copy_with_identity
189
190  @property
191  def copy_with_identity(self):
192    return self._copy_with_identity
193
194  def _Generate(
195      self, input_signal_filepath, test_data_cache_path, base_output_path):
196    config_name = 'default'
197    output_path = self._MakeDir(base_output_path, config_name)
198
199    if self._copy_with_identity:
200      input_signal_filepath_new = os.path.join(
201          test_data_cache_path, os.path.split(input_signal_filepath)[1])
202      logging.info('copying ' + input_signal_filepath + ' to ' + (
203          input_signal_filepath_new))
204      shutil.copy(input_signal_filepath, input_signal_filepath_new)
205      input_signal_filepath = input_signal_filepath_new
206
207    self._AddNoiseReferenceFilesPair(
208        config_name=config_name,
209        noisy_signal_filepath=input_signal_filepath,
210        reference_signal_filepath=input_signal_filepath,
211        output_path=output_path)
212
213
214@TestDataGenerator.RegisterClass
215class WhiteNoiseTestDataGenerator(TestDataGenerator):
216  """Generator that adds white noise.
217  """
218
219  NAME = 'white_noise'
220
221  # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
222  # The reference (second value of each pair) always has a lower amount of noise
223  # - i.e., the SNR is 10 dB higher.
224  _SNR_VALUE_PAIRS = [
225      [20, 30],  # Smallest noise.
226      [10, 20],
227      [5, 15],
228      [0, 10],  # Largest noise.
229  ]
230
231  _NOISY_SIGNAL_FILENAME_TEMPLATE = 'noise_{0:d}_SNR.wav'
232
233  def __init__(self, output_directory_prefix):
234    TestDataGenerator.__init__(self, output_directory_prefix)
235
236  def _Generate(
237      self, input_signal_filepath, test_data_cache_path, base_output_path):
238    # Load the input signal.
239    input_signal = signal_processing.SignalProcessingUtils.LoadWav(
240        input_signal_filepath)
241
242    # Create the noise track.
243    noise_signal = signal_processing.SignalProcessingUtils.GenerateWhiteNoise(
244        input_signal)
245
246    # Create the noisy mixes (once for each unique SNR value).
247    noisy_mix_filepaths = {}
248    snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
249    for snr in snr_values:
250      noisy_signal_filepath = os.path.join(
251          test_data_cache_path,
252          self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(snr))
253
254      # Create and save if not done.
255      if not os.path.exists(noisy_signal_filepath):
256        # Create noisy signal.
257        noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
258            input_signal, noise_signal, snr)
259
260        # Save.
261        signal_processing.SignalProcessingUtils.SaveWav(
262            noisy_signal_filepath, noisy_signal)
263
264      # Add file to the collection of mixes.
265      noisy_mix_filepaths[snr] = noisy_signal_filepath
266
267    # Add all the noisy-reference signal pairs.
268    for snr_noisy, snr_refence in self._SNR_VALUE_PAIRS:
269      config_name = '{0:d}_{1:d}_SNR'.format(snr_noisy, snr_refence)
270      output_path = self._MakeDir(base_output_path, config_name)
271      self._AddNoiseReferenceFilesPair(
272          config_name=config_name,
273          noisy_signal_filepath=noisy_mix_filepaths[snr_noisy],
274          reference_signal_filepath=noisy_mix_filepaths[snr_refence],
275          output_path=output_path)
276
277
278# TODO(alessiob): remove comment when class implemented.
279# @TestDataGenerator.RegisterClass
280class NarrowBandNoiseTestDataGenerator(TestDataGenerator):
281  """Generator that adds narrow-band noise.
282  """
283
284  NAME = 'narrow_band_noise'
285
286  def __init__(self, output_directory_prefix):
287    TestDataGenerator.__init__(self, output_directory_prefix)
288
289  def _Generate(
290      self, input_signal_filepath, test_data_cache_path, base_output_path):
291    # TODO(alessiob): implement.
292    pass
293
294
295@TestDataGenerator.RegisterClass
296class AdditiveNoiseTestDataGenerator(TestDataGenerator):
297  """Generator that adds noise loops.
298
299  This generator uses all the wav files in a given path (default: noise_tracks/)
300  and mixes them to the clean speech with different target SNRs (hard-coded).
301  """
302
303  NAME = 'additive_noise'
304  _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
305
306  DEFAULT_NOISE_TRACKS_PATH = os.path.join(
307      os.path.dirname(__file__), os.pardir, 'noise_tracks')
308
309  # TODO(alessiob): Make the list of SNR pairs customizable.
310  # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
311  # The reference (second value of each pair) always has a lower amount of noise
312  # - i.e., the SNR is 10 dB higher.
313  _SNR_VALUE_PAIRS = [
314      [20, 30],  # Smallest noise.
315      [10, 20],
316      [5, 15],
317      [0, 10],  # Largest noise.
318  ]
319
320  def __init__(self, output_directory_prefix, noise_tracks_path):
321    TestDataGenerator.__init__(self, output_directory_prefix)
322    self._noise_tracks_path = noise_tracks_path
323    self._noise_tracks_file_names = [n for n in os.listdir(
324        self._noise_tracks_path) if n.lower().endswith('.wav')]
325    if len(self._noise_tracks_file_names) == 0:
326      raise exceptions.InitializationException(
327          'No wav files found in the noise tracks path %s' % (
328              self._noise_tracks_path))
329
330  def _Generate(
331      self, input_signal_filepath, test_data_cache_path, base_output_path):
332    """Generates test data pairs using environmental noise.
333
334    For each noise track and pair of SNR values, the following two audio tracks
335    are created: the noisy signal and the reference signal. The former is
336    obtained by mixing the (clean) input signal to the corresponding noise
337    track enforcing the target SNR.
338    """
339    # Init.
340    snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
341
342    # Load the input signal.
343    input_signal = signal_processing.SignalProcessingUtils.LoadWav(
344        input_signal_filepath)
345
346    noisy_mix_filepaths = {}
347    for noise_track_filename in self._noise_tracks_file_names:
348      # Load the noise track.
349      noise_track_name, _ = os.path.splitext(noise_track_filename)
350      noise_track_filepath = os.path.join(
351          self._noise_tracks_path, noise_track_filename)
352      if not os.path.exists(noise_track_filepath):
353        logging.error('cannot find the <%s> noise track', noise_track_filename)
354        raise exceptions.FileNotFoundError()
355
356      noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
357          noise_track_filepath)
358
359      # Create the noisy mixes (once for each unique SNR value).
360      noisy_mix_filepaths[noise_track_name] = {}
361      for snr in snr_values:
362        noisy_signal_filepath = os.path.join(
363            test_data_cache_path,
364            self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(noise_track_name, snr))
365
366        # Create and save if not done.
367        if not os.path.exists(noisy_signal_filepath):
368          # Create noisy signal.
369          noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
370              input_signal, noise_signal, snr,
371              pad_noise=signal_processing.SignalProcessingUtils.MixPadding.LOOP)
372
373          # Save.
374          signal_processing.SignalProcessingUtils.SaveWav(
375              noisy_signal_filepath, noisy_signal)
376
377        # Add file to the collection of mixes.
378        noisy_mix_filepaths[noise_track_name][snr] = noisy_signal_filepath
379
380    # Add all the noise-SNR pairs.
381    self._AddNoiseSnrPairs(
382        base_output_path, noisy_mix_filepaths, self._SNR_VALUE_PAIRS)
383
384
385@TestDataGenerator.RegisterClass
386class ReverberationTestDataGenerator(TestDataGenerator):
387  """Generator that adds reverberation noise.
388
389  TODO(alessiob): Make this class more generic since the impulse response can be
390  anything (not just reverberation); call it e.g.,
391  ConvolutionalNoiseTestDataGenerator.
392  """
393
394  NAME = 'reverberation'
395
396  _IMPULSE_RESPONSES = {
397      'lecture': 'air_binaural_lecture_0_0_1.mat',  # Long echo.
398      'booth': 'air_binaural_booth_0_0_1.mat',  # Short echo.
399  }
400  _MAX_IMPULSE_RESPONSE_LENGTH = None
401
402  # Each pair indicates the clean vs. noisy and reference vs. noisy SNRs.
403  # The reference (second value of each pair) always has a lower amount of noise
404  # - i.e., the SNR is 5 dB higher.
405  _SNR_VALUE_PAIRS = [
406      [3, 8],  # Smallest noise.
407      [-3, 2],  # Largest noise.
408  ]
409
410  _NOISE_TRACK_FILENAME_TEMPLATE = '{0}.wav'
411  _NOISY_SIGNAL_FILENAME_TEMPLATE = '{0}_{1:d}_SNR.wav'
412
413  def __init__(self, output_directory_prefix, aechen_ir_database_path):
414    TestDataGenerator.__init__(self, output_directory_prefix)
415    self._aechen_ir_database_path = aechen_ir_database_path
416
417  def _Generate(
418      self, input_signal_filepath, test_data_cache_path, base_output_path):
419    """Generates test data pairs using reverberation noise.
420
421    For each impulse response, one noise track is created. For each impulse
422    response and pair of SNR values, the following 2 audio tracks are
423    created: the noisy signal and the reference signal. The former is
424    obtained by mixing the (clean) input signal to the corresponding noise
425    track enforcing the target SNR.
426    """
427    # Init.
428    snr_values = set([snr for pair in self._SNR_VALUE_PAIRS for snr in pair])
429
430    # Load the input signal.
431    input_signal = signal_processing.SignalProcessingUtils.LoadWav(
432        input_signal_filepath)
433
434    noisy_mix_filepaths = {}
435    for impulse_response_name in self._IMPULSE_RESPONSES:
436      noise_track_filename = self._NOISE_TRACK_FILENAME_TEMPLATE.format(
437          impulse_response_name)
438      noise_track_filepath = os.path.join(
439          test_data_cache_path, noise_track_filename)
440      noise_signal = None
441      try:
442        # Load noise track.
443        noise_signal = signal_processing.SignalProcessingUtils.LoadWav(
444            noise_track_filepath)
445      except exceptions.FileNotFoundError:
446        # Generate noise track by applying the impulse response.
447        impulse_response_filepath = os.path.join(
448            self._aechen_ir_database_path,
449            self._IMPULSE_RESPONSES[impulse_response_name])
450        noise_signal = self._GenerateNoiseTrack(
451            noise_track_filepath, input_signal, impulse_response_filepath)
452      assert noise_signal is not None
453
454      # Create the noisy mixes (once for each unique SNR value).
455      noisy_mix_filepaths[impulse_response_name] = {}
456      for snr in snr_values:
457        noisy_signal_filepath = os.path.join(
458            test_data_cache_path,
459            self._NOISY_SIGNAL_FILENAME_TEMPLATE.format(
460                impulse_response_name, snr))
461
462        # Create and save if not done.
463        if not os.path.exists(noisy_signal_filepath):
464          # Create noisy signal.
465          noisy_signal = signal_processing.SignalProcessingUtils.MixSignals(
466              input_signal, noise_signal, snr)
467
468          # Save.
469          signal_processing.SignalProcessingUtils.SaveWav(
470              noisy_signal_filepath, noisy_signal)
471
472        # Add file to the collection of mixes.
473        noisy_mix_filepaths[impulse_response_name][snr] = noisy_signal_filepath
474
475    # Add all the noise-SNR pairs.
476    self._AddNoiseSnrPairs(base_output_path, noisy_mix_filepaths,
477                           self._SNR_VALUE_PAIRS)
478
479  def _GenerateNoiseTrack(self, noise_track_filepath, input_signal,
480                            impulse_response_filepath):
481    """Generates noise track.
482
483    Generate a signal by convolving input_signal with the impulse response in
484    impulse_response_filepath; then save to noise_track_filepath.
485
486    Args:
487      noise_track_filepath: output file path for the noise track.
488      input_signal: (clean) input signal samples.
489      impulse_response_filepath: impulse response file path.
490
491    Returns:
492      AudioSegment instance.
493    """
494    # Load impulse response.
495    data = scipy.io.loadmat(impulse_response_filepath)
496    impulse_response = data['h_air'].flatten()
497    if self._MAX_IMPULSE_RESPONSE_LENGTH is not None:
498      logging.info('truncating impulse response from %d to %d samples',
499                   len(impulse_response), self._MAX_IMPULSE_RESPONSE_LENGTH)
500      impulse_response = impulse_response[:self._MAX_IMPULSE_RESPONSE_LENGTH]
501
502    # Apply impulse response.
503    processed_signal = (
504        signal_processing.SignalProcessingUtils.ApplyImpulseResponse(
505            input_signal, impulse_response))
506
507    # Save.
508    signal_processing.SignalProcessingUtils.SaveWav(
509        noise_track_filepath, processed_signal)
510
511    return processed_signal
512