1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import numpy
19import os
20import scipy.io.wavfile as sciwav
21
22from acts_contrib.test_utils.coex.audio_capture_device import AudioCaptureBase
23from acts_contrib.test_utils.coex.audio_capture_device import CaptureAudioOverAdb
24from acts_contrib.test_utils.coex.audio_capture_device import CaptureAudioOverLocal
25from acts_contrib.test_utils.audio_analysis_lib import audio_analysis
26from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis
27
28ANOMALY_DETECTION_BLOCK_SIZE = audio_analysis.ANOMALY_DETECTION_BLOCK_SIZE
29ANOMALY_GROUPING_TOLERANCE = audio_analysis.ANOMALY_GROUPING_TOLERANCE
30PATTERN_MATCHING_THRESHOLD = audio_analysis.PATTERN_MATCHING_THRESHOLD
31ANALYSIS_FILE_TEMPLATE = "audio_analysis_%s.txt"
32bits_per_sample = 32
33
34
35def get_audio_capture_device(ad, audio_params):
36    """Gets the device object of the audio capture device connected to server.
37
38    The audio capture device returned is specified by the audio_params
39    within user_params. audio_params must specify a "type" field, that
40    is either "AndroidDevice" or "Local"
41
42    Args:
43        ad: Android Device object.
44        audio_params: object containing variables to record audio.
45
46    Returns:
47        Object of the audio capture device.
48
49    Raises:
50        ValueError if audio_params['type'] is not "AndroidDevice" or
51            "Local".
52    """
53
54    if audio_params['type'] == 'AndroidDevice':
55        return CaptureAudioOverAdb(ad, audio_params)
56
57    elif audio_params['type'] == 'Local':
58        return CaptureAudioOverLocal(audio_params)
59
60    else:
61        raise ValueError('Unrecognized audio capture device '
62                         '%s' % audio_params['type'])
63
64
65class FileNotFound(Exception):
66    """Raises Exception if file is not present"""
67
68
69class AudioCaptureResult(AudioCaptureBase):
70    def __init__(self, path, audio_params=None):
71        """Initializes Audio Capture Result class.
72
73        Args:
74            path: Path of audio capture result.
75        """
76        super().__init__()
77        self.path = path
78        self.audio_params = audio_params
79        self.analysis_path = os.path.join(self.log_path,
80                                          ANALYSIS_FILE_TEMPLATE)
81        if self.audio_params:
82            self._trim_wave_file()
83
84    def THDN(self, win_size=None, step_size=None, q=1, freq=None):
85        """Calculate THD+N value for most recently recorded file.
86
87        Args:
88            win_size: analysis window size (must be less than length of
89                signal). Used with step size to analyze signal piece by
90                piece. If not specified, entire signal will be analyzed.
91            step_size: number of samples to move window per-analysis. If not
92                specified, entire signal will be analyzed.
93            q: quality factor for the notch filter used to remove fundamental
94                frequency from signal to isolate noise.
95            freq: the fundamental frequency to remove from the signal. If none,
96                the fundamental frequency will be determined using FFT.
97        Returns:
98            channel_results (list): THD+N value for each channel's signal.
99                List index corresponds to channel index.
100        """
101        if not (win_size and step_size):
102            return audio_analysis.get_file_THDN(filename=self.path,
103                                                q=q,
104                                                freq=freq)
105        else:
106            return audio_analysis.get_file_max_THDN(filename=self.path,
107                                                    step_size=step_size,
108                                                    window_size=win_size,
109                                                    q=q,
110                                                    freq=freq)
111
112    def detect_anomalies(self,
113                         freq=None,
114                         block_size=ANOMALY_DETECTION_BLOCK_SIZE,
115                         threshold=PATTERN_MATCHING_THRESHOLD,
116                         tolerance=ANOMALY_GROUPING_TOLERANCE):
117        """Detect anomalies in most recently recorded file.
118
119        An anomaly is defined as a sample in a recorded sine wave that differs
120        by at least the value defined by the threshold parameter from a golden
121        generated sine wave of the same amplitude, sample rate, and frequency.
122
123        Args:
124            freq (int|float): fundamental frequency of the signal. All other
125                frequencies are noise. If None, will be calculated with FFT.
126            block_size (int): the number of samples to analyze at a time in the
127                anomaly detection algorithm.
128            threshold (float): the threshold of the correlation index to
129                determine if two sample values match.
130            tolerance (float): the sample tolerance for anomaly time values
131                to be grouped as the same anomaly
132        Returns:
133            channel_results (list): anomaly durations for each channel's signal.
134                List index corresponds to channel index.
135        """
136        return audio_analysis.get_file_anomaly_durations(filename=self.path,
137                                                         freq=freq,
138                                                         block_size=block_size,
139                                                         threshold=threshold,
140                                                         tolerance=tolerance)
141
142    @property
143    def analysis_fileno(self):
144        """Returns the file number to dump audio analysis results."""
145        counter = 0
146        while os.path.exists(self.analysis_path % counter):
147            counter += 1
148        return counter
149
150    def audio_quality_analysis(self):
151        """Measures audio quality based on the audio file given as input.
152
153        Returns:
154            analysis_path on success.
155        """
156        analysis_path = self.analysis_path % self.analysis_fileno
157        if not os.path.exists(self.path):
158            raise FileNotFound("Recorded file not found")
159        try:
160            quality_analysis(filename=self.path,
161                             output_file=analysis_path,
162                             bit_width=bits_per_sample,
163                             rate=self.audio_params["sample_rate"],
164                             channel=self.audio_params["channel"],
165                             spectral_only=False)
166        except Exception as err:
167            logging.exception("Failed to analyze raw audio: %s" % err)
168        return analysis_path
169
170    def _trim_wave_file(self):
171        """Trim wave files.
172
173        """
174        original_record_file_name = 'original_' + os.path.basename(self.path)
175        original_record_file_path = os.path.join(os.path.dirname(self.path),
176                                                 original_record_file_name)
177        os.rename(self.path, original_record_file_path)
178        fs, data = sciwav.read(original_record_file_path)
179        trim_start = self.audio_params['trim_start']
180        trim_end = self.audio_params['trim_end']
181        trim = numpy.array([[trim_start, trim_end]])
182        trim = trim * fs
183        new_wave_file_list = []
184        for elem in trim:
185            # To check start and end doesn't exceed raw data dimension
186            start_read = min(elem[0], data.shape[0] - 1)
187            end_read = min(elem[1], data.shape[0] - 1)
188            new_wave_file_list.extend(data[start_read:end_read])
189        new_wave_file = numpy.array(new_wave_file_list)
190
191        sciwav.write(self.path, fs, new_wave_file)
192