1# Lint as: python3
2"""Utils for bluetooth audio testing."""
3
4import logging as log
5import math
6import os
7from typing import Optional
8import numpy as np
9from scipy import signal as scipy_signal
10from scipy.io import wavfile
11# Internal import
12# Internal import
13
14# Dict keys of the THD+N analysis result
15_THDN_KEY = 'thd+n'
16_START_TIME_KEY = 'start_time'
17_END_TIME_KEY = 'end_time'
18
19
20def generate_sine_wave_to_device(
21    device,
22    pushed_file_path='/sdcard/Music',
23    frequency=480,
24    channel=2,
25    sample_rate=48000,
26    sample_format=16,
27    duration_sec=10):
28  """Generates a fixed frequency sine wave file and push it to the device.
29
30  Generates a sine wave to the Mobly device directory and push it to the device
31  storage. The output file name format is such as the example:
32    sine_480hz_2ch_48000rate_16bit_10sec.wav
33
34  Args:
35    device: AndroidDevice, Mobly Android controller class.
36    pushed_file_path: string, the wave file path which is pushed to the device
37        storage. E.g. /sdcard/Music
38    frequency: int, fixed frequency in Hz.
39    channel: int, number of channels.
40    sample_rate: int, sampling rate in Hz.
41    sample_format: int, sampling format in bit.
42    duration_sec: int, audio duration in second.
43
44  Returns:
45    device_storage_path: string, the wave file on the device storage.
46    mobly_directory_path: string, the wave file on the Mobly device directory.
47  """
48  file_name = 'sine_%dhz_%dch_%drate_%dbit_%dsec.wav' % (
49      frequency, channel, sample_rate, sample_format, duration_sec)
50  mobly_directory_path = os.path.join(device.log_path, file_name)
51  os.system('%s -n -c %d -r %d -b %d %s synth %d sine %d' %
52            (audio_processor.AudioProcessor.SOX, channel, sample_rate,
53             sample_format, mobly_directory_path, duration_sec, frequency))
54  device.adb.push([mobly_directory_path, pushed_file_path])
55  device_storage_path = os.path.join(pushed_file_path, file_name)
56  return device_storage_path, mobly_directory_path
57
58
59def measure_audio_mos(recorded_audio_file, reference_audio_file):
60  """Measures mean opinion score (MOS) of a recorded audio.
61
62  This function uses the module of A/V Analysis Service to measure MOS:
63  Internal reference
64
65  Args:
66    recorded_audio_file: string, the recorded audio file to be measured.
67    reference_audio_file: string, the reference audio file for comparison.
68
69  Returns:
70    Float which is the mean opinion score of the recorded audio.
71  """
72  results = audio_calculator.AudioAnalyzer().Analyze(reference_audio_file,
73                                                     recorded_audio_file)
74  # Returns 0.0 if the results fails to be generated.
75  if not results:
76    log.warning('Failed to generate the audio analysis results.')
77    return 0.0
78  return results[0].mos
79
80
81def measure_fundamental_frequency(signal, sample_rate):
82  """Measures fundamental frequency of a signal.
83
84  Args:
85    signal: An 1-D array representing the signal data.
86    sample_rate: int, sample rate of the signal.
87
88  Returns:
89    Float representing the fundamental frequency.
90  """
91  return sample_rate * (np.argmax(np.abs(np.fft.rfft(signal))) / len(signal))
92
93
94def measure_rms(signal):
95  """Measures Root Mean Square (RMS) of a signal.
96
97  Args:
98    signal: An 1-D array representing the signal data.
99
100  Returns:
101    Float representing the root mean square.
102  """
103  return np.sqrt(np.mean(np.absolute(signal)**2))
104
105
106def measure_thdn(signal, sample_rate, q, frequency=None):
107  """Measures Total Harmonic Distortion + Noise (THD+N) of a signal.
108
109  Args:
110    signal: An 1-D array representing the signal data.
111    sample_rate: int, sample rate of the signal.
112    q: float, quality factor for the notch filter.
113    frequency: float, fundamental frequency of the signal. All other frequencies
114        are noise. If not specified, will be calculated using FFT.
115
116  Returns:
117    Float representing THD+N ratio calculated from the ratio of RMS of pure
118        harmonics and noise signal to RMS of original signal.
119  """
120  # Normalizes the signal.
121  signal -= np.mean(signal)
122  # Gets Blackman-Harris window from the signal.
123  window = signal * scipy_signal.blackmanharris(len(signal))
124  # Finds the fundamental frequency to remove if not specified.
125  if not frequency:
126    frequency = measure_fundamental_frequency(window, sample_rate)
127  # Creates a notch filter to get noise from the signal.
128  wo = frequency / (sample_rate / 2)
129  b, a = scipy_signal.iirnotch(wo, q)
130  noise = scipy_signal.lfilter(b, a, window)
131  return measure_rms(noise) / measure_rms(window)
132
133
134def measure_audio_thdn_per_window(
135    audio_file,
136    thdn_threshold,
137    step_size,
138    window_size,
139    q=1.0,
140    frequency=None):
141  """Measures Total Harmonic Distortion + Noise (THD+N) of an audio file.
142
143  This function is used to capture audio glitches from a recorded audio file,
144  and the audio file shall record a fixed frequency sine wave.
145
146  Args:
147    audio_file: A .wav file to be measured.
148    thdn_threshold: float, a THD+N threshold used to compare with the measured
149        THD+N for every windows. If THD+N of a window is greater than the
150        threshold, will record this to results.
151    step_size: int, number of samples to move the window by for each analysis.
152    window_size: int, number of samples to analyze each time.
153    q: float, quality factor for the notch filter.
154    frequency: float, fundamental frequency of the signal. All other frequencies
155        are noise. If not specified, will be calculated using FFT.
156
157  Returns:
158    List containing each result of channels. Like the following structure:
159        ```
160          [
161              [  # result of channel 1
162                  {
163                      "thd+n": <float>,  # THD+N of a window
164                      "start_time": <float>,  # start time of a window
165                      "end_time": <float>,  # end time of a window
166                  },
167                  ...,
168              ],
169              [...,]  # result of channel 2
170              ...,
171          ]
172        ```
173  """
174  if step_size <= 0:
175    raise ValueError('step_size shall be greater than 0.')
176  if window_size <= 0:
177    raise ValueError('window_size shall be greater than 0.')
178  sample_rate, wave_data = wavfile.read(audio_file)
179  wave_data = wave_data.astype('float64')
180  if len(wave_data.shape) == 1:
181    channel_signals = (wave_data,)
182  else:
183    channel_signals = wave_data.transpose()
184  # Collects the result for each channels.
185  results = []
186  for signal in channel_signals:
187    current_position = 0
188    channel_result = []
189    while current_position + window_size <= len(signal):
190      window = signal[current_position:current_position + window_size]
191      thdn = measure_thdn(
192          signal=window,
193          sample_rate=sample_rate,
194          q=q,
195          frequency=frequency)
196      start_time = current_position / sample_rate
197      end_time = (current_position + window_size) / sample_rate
198      if thdn > thdn_threshold:
199        channel_result.append({
200            _THDN_KEY: thdn,
201            _START_TIME_KEY: start_time,
202            _END_TIME_KEY: end_time
203        })
204      current_position += step_size
205    results.append(channel_result)
206  return results
207
208
209def get_audio_maximum_thdn(
210    audio_file: str,
211    step_size: int,
212    window_size: int,
213    q: float = 1.0,
214    frequency: Optional[float] = None) -> float:
215  """Gets maximum THD+N from each audio sample with specified window size.
216
217  Args:
218    audio_file: A .wav file to be measured.
219    step_size: Number of samples to move the window by for each analysis.
220    window_size: Number of samples to analyze each time.
221    q: Quality factor for the notch filter.
222    frequency: Fundamental frequency of the signal. All other frequencies
223        are noise. If not specified, will be calculated using FFT.
224
225  Returns:
226    Float representing the maximum THD+N for the audio file.
227  """
228  # Gets all analysis results.
229  total_results = measure_audio_thdn_per_window(
230      audio_file=audio_file,
231      thdn_threshold=-1.0,
232      step_size=step_size,
233      window_size=window_size,
234      q=q,
235      frequency=frequency)
236
237  max_thdn_result = {_THDN_KEY: -1.0, _START_TIME_KEY: -1, _END_TIME_KEY: -1}
238  for channel_results in total_results:
239    for result in channel_results:
240      if result[_THDN_KEY] > max_thdn_result[_THDN_KEY]:
241        max_thdn_result = result
242
243  log.info('Maximum THD+N result: %s', max_thdn_result)
244  return max_thdn_result[_THDN_KEY]
245
246
247def convert_thdn_percent_to_decibels(thdn_percent: float) -> float:
248  """Converts THD+N percentage to decibels (dB).
249
250  Args:
251    thdn_percent: THD+N in percent. E.g. 0.001.
252
253  Returns:
254    THD+N in decibels.
255  """
256  return math.log(thdn_percent / 100, 10) * 20
257
258
259def trim_audio(audio_file: str,
260               duration_sec: float,
261               start_time_sec: float = 0.0) -> str:
262  """Trims an audio file with a specific start time and duration.
263
264  Generates a output file and its name is such as below format:
265    `<input file name>_<start time sec>-<duration sec>.<input file type>`
266
267  Args:
268    audio_file: string, an audio file to be trimed.
269    duration_sec: float, the duration of the output file in seconds.
270    start_time_sec: float, the start time of the audio file to be trimmed in
271        seconds. Default value is 0.0 second if not specified.
272
273  Returns:
274    String, the output file of the same path of the origin file.
275  """
276  file_path, file_name = os.path.split(audio_file)
277  file_name, file_ext = os.path.splitext(file_name)
278  output_file_name = '%s_%s-%s%s' % (
279      file_name,
280      start_time_sec,
281      (start_time_sec + duration_sec),
282      file_ext)
283  output_file = os.path.join(file_path, output_file_name)
284  processor = audio_processor.AudioProcessor()
285  processor.TrimAudio(
286      input_file=audio_file,
287      output_file=output_file,
288      duration=duration_sec,
289      start=start_time_sec)
290  return output_file
291