1# Lint as: python3 2"""Utils for bluetooth audio testing.""" 3 4import logging as log 5import math 6import os 7from typing import Optional 8import numpy as np 9from scipy import signal as scipy_signal 10from scipy.io import wavfile 11# Internal import 12# Internal import 13 14# Dict keys of the THD+N analysis result 15_THDN_KEY = 'thd+n' 16_START_TIME_KEY = 'start_time' 17_END_TIME_KEY = 'end_time' 18 19 20def generate_sine_wave_to_device( 21 device, 22 pushed_file_path='/sdcard/Music', 23 frequency=480, 24 channel=2, 25 sample_rate=48000, 26 sample_format=16, 27 duration_sec=10): 28 """Generates a fixed frequency sine wave file and push it to the device. 29 30 Generates a sine wave to the Mobly device directory and push it to the device 31 storage. The output file name format is such as the example: 32 sine_480hz_2ch_48000rate_16bit_10sec.wav 33 34 Args: 35 device: AndroidDevice, Mobly Android controller class. 36 pushed_file_path: string, the wave file path which is pushed to the device 37 storage. E.g. /sdcard/Music 38 frequency: int, fixed frequency in Hz. 39 channel: int, number of channels. 40 sample_rate: int, sampling rate in Hz. 41 sample_format: int, sampling format in bit. 42 duration_sec: int, audio duration in second. 43 44 Returns: 45 device_storage_path: string, the wave file on the device storage. 46 mobly_directory_path: string, the wave file on the Mobly device directory. 47 """ 48 file_name = 'sine_%dhz_%dch_%drate_%dbit_%dsec.wav' % ( 49 frequency, channel, sample_rate, sample_format, duration_sec) 50 mobly_directory_path = os.path.join(device.log_path, file_name) 51 os.system('%s -n -c %d -r %d -b %d %s synth %d sine %d' % 52 (audio_processor.AudioProcessor.SOX, channel, sample_rate, 53 sample_format, mobly_directory_path, duration_sec, frequency)) 54 device.adb.push([mobly_directory_path, pushed_file_path]) 55 device_storage_path = os.path.join(pushed_file_path, file_name) 56 return device_storage_path, mobly_directory_path 57 58 59def measure_audio_mos(recorded_audio_file, reference_audio_file): 60 """Measures mean opinion score (MOS) of a recorded audio. 61 62 This function uses the module of A/V Analysis Service to measure MOS: 63 Internal reference 64 65 Args: 66 recorded_audio_file: string, the recorded audio file to be measured. 67 reference_audio_file: string, the reference audio file for comparison. 68 69 Returns: 70 Float which is the mean opinion score of the recorded audio. 71 """ 72 results = audio_calculator.AudioAnalyzer().Analyze(reference_audio_file, 73 recorded_audio_file) 74 # Returns 0.0 if the results fails to be generated. 75 if not results: 76 log.warning('Failed to generate the audio analysis results.') 77 return 0.0 78 return results[0].mos 79 80 81def measure_fundamental_frequency(signal, sample_rate): 82 """Measures fundamental frequency of a signal. 83 84 Args: 85 signal: An 1-D array representing the signal data. 86 sample_rate: int, sample rate of the signal. 87 88 Returns: 89 Float representing the fundamental frequency. 90 """ 91 return sample_rate * (np.argmax(np.abs(np.fft.rfft(signal))) / len(signal)) 92 93 94def measure_rms(signal): 95 """Measures Root Mean Square (RMS) of a signal. 96 97 Args: 98 signal: An 1-D array representing the signal data. 99 100 Returns: 101 Float representing the root mean square. 102 """ 103 return np.sqrt(np.mean(np.absolute(signal)**2)) 104 105 106def measure_thdn(signal, sample_rate, q, frequency=None): 107 """Measures Total Harmonic Distortion + Noise (THD+N) of a signal. 108 109 Args: 110 signal: An 1-D array representing the signal data. 111 sample_rate: int, sample rate of the signal. 112 q: float, quality factor for the notch filter. 113 frequency: float, fundamental frequency of the signal. All other frequencies 114 are noise. If not specified, will be calculated using FFT. 115 116 Returns: 117 Float representing THD+N ratio calculated from the ratio of RMS of pure 118 harmonics and noise signal to RMS of original signal. 119 """ 120 # Normalizes the signal. 121 signal -= np.mean(signal) 122 # Gets Blackman-Harris window from the signal. 123 window = signal * scipy_signal.blackmanharris(len(signal)) 124 # Finds the fundamental frequency to remove if not specified. 125 if not frequency: 126 frequency = measure_fundamental_frequency(window, sample_rate) 127 # Creates a notch filter to get noise from the signal. 128 wo = frequency / (sample_rate / 2) 129 b, a = scipy_signal.iirnotch(wo, q) 130 noise = scipy_signal.lfilter(b, a, window) 131 return measure_rms(noise) / measure_rms(window) 132 133 134def measure_audio_thdn_per_window( 135 audio_file, 136 thdn_threshold, 137 step_size, 138 window_size, 139 q=1.0, 140 frequency=None): 141 """Measures Total Harmonic Distortion + Noise (THD+N) of an audio file. 142 143 This function is used to capture audio glitches from a recorded audio file, 144 and the audio file shall record a fixed frequency sine wave. 145 146 Args: 147 audio_file: A .wav file to be measured. 148 thdn_threshold: float, a THD+N threshold used to compare with the measured 149 THD+N for every windows. If THD+N of a window is greater than the 150 threshold, will record this to results. 151 step_size: int, number of samples to move the window by for each analysis. 152 window_size: int, number of samples to analyze each time. 153 q: float, quality factor for the notch filter. 154 frequency: float, fundamental frequency of the signal. All other frequencies 155 are noise. If not specified, will be calculated using FFT. 156 157 Returns: 158 List containing each result of channels. Like the following structure: 159 ``` 160 [ 161 [ # result of channel 1 162 { 163 "thd+n": <float>, # THD+N of a window 164 "start_time": <float>, # start time of a window 165 "end_time": <float>, # end time of a window 166 }, 167 ..., 168 ], 169 [...,] # result of channel 2 170 ..., 171 ] 172 ``` 173 """ 174 if step_size <= 0: 175 raise ValueError('step_size shall be greater than 0.') 176 if window_size <= 0: 177 raise ValueError('window_size shall be greater than 0.') 178 sample_rate, wave_data = wavfile.read(audio_file) 179 wave_data = wave_data.astype('float64') 180 if len(wave_data.shape) == 1: 181 channel_signals = (wave_data,) 182 else: 183 channel_signals = wave_data.transpose() 184 # Collects the result for each channels. 185 results = [] 186 for signal in channel_signals: 187 current_position = 0 188 channel_result = [] 189 while current_position + window_size <= len(signal): 190 window = signal[current_position:current_position + window_size] 191 thdn = measure_thdn( 192 signal=window, 193 sample_rate=sample_rate, 194 q=q, 195 frequency=frequency) 196 start_time = current_position / sample_rate 197 end_time = (current_position + window_size) / sample_rate 198 if thdn > thdn_threshold: 199 channel_result.append({ 200 _THDN_KEY: thdn, 201 _START_TIME_KEY: start_time, 202 _END_TIME_KEY: end_time 203 }) 204 current_position += step_size 205 results.append(channel_result) 206 return results 207 208 209def get_audio_maximum_thdn( 210 audio_file: str, 211 step_size: int, 212 window_size: int, 213 q: float = 1.0, 214 frequency: Optional[float] = None) -> float: 215 """Gets maximum THD+N from each audio sample with specified window size. 216 217 Args: 218 audio_file: A .wav file to be measured. 219 step_size: Number of samples to move the window by for each analysis. 220 window_size: Number of samples to analyze each time. 221 q: Quality factor for the notch filter. 222 frequency: Fundamental frequency of the signal. All other frequencies 223 are noise. If not specified, will be calculated using FFT. 224 225 Returns: 226 Float representing the maximum THD+N for the audio file. 227 """ 228 # Gets all analysis results. 229 total_results = measure_audio_thdn_per_window( 230 audio_file=audio_file, 231 thdn_threshold=-1.0, 232 step_size=step_size, 233 window_size=window_size, 234 q=q, 235 frequency=frequency) 236 237 max_thdn_result = {_THDN_KEY: -1.0, _START_TIME_KEY: -1, _END_TIME_KEY: -1} 238 for channel_results in total_results: 239 for result in channel_results: 240 if result[_THDN_KEY] > max_thdn_result[_THDN_KEY]: 241 max_thdn_result = result 242 243 log.info('Maximum THD+N result: %s', max_thdn_result) 244 return max_thdn_result[_THDN_KEY] 245 246 247def convert_thdn_percent_to_decibels(thdn_percent: float) -> float: 248 """Converts THD+N percentage to decibels (dB). 249 250 Args: 251 thdn_percent: THD+N in percent. E.g. 0.001. 252 253 Returns: 254 THD+N in decibels. 255 """ 256 return math.log(thdn_percent / 100, 10) * 20 257 258 259def trim_audio(audio_file: str, 260 duration_sec: float, 261 start_time_sec: float = 0.0) -> str: 262 """Trims an audio file with a specific start time and duration. 263 264 Generates a output file and its name is such as below format: 265 `<input file name>_<start time sec>-<duration sec>.<input file type>` 266 267 Args: 268 audio_file: string, an audio file to be trimed. 269 duration_sec: float, the duration of the output file in seconds. 270 start_time_sec: float, the start time of the audio file to be trimmed in 271 seconds. Default value is 0.0 second if not specified. 272 273 Returns: 274 String, the output file of the same path of the origin file. 275 """ 276 file_path, file_name = os.path.split(audio_file) 277 file_name, file_ext = os.path.splitext(file_name) 278 output_file_name = '%s_%s-%s%s' % ( 279 file_name, 280 start_time_sec, 281 (start_time_sec + duration_sec), 282 file_ext) 283 output_file = os.path.join(file_path, output_file_name) 284 processor = audio_processor.AudioProcessor() 285 processor.TrimAudio( 286 input_file=audio_file, 287 output_file=output_file, 288 duration=duration_sec, 289 start=start_time_sec) 290 return output_file 291