1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone test implementation."""
17import acts
18import os
19import shutil
20import time
21
22import acts_contrib.test_utils.coex.audio_test_utils as atu
23import acts_contrib.test_utils.bt.bt_test_utils as btutils
24from acts import asserts
25from acts_contrib.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
26from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27
28PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
29INIT_ATTEN = 0
30WAIT_TIME = 1
31
32
33class A2dpBaseTest(BluetoothBaseTest):
34    """Stream audio file over desired Bluetooth codec configurations.
35
36    Audio file should be a sine wave. Other audio files will not work for the
37    test analysis metrics.
38
39    Device under test is Android phone, connected to headset with a controller
40    that can generate a BluetoothHandsfreeAbstractDevice from test_utils.
41    abstract_devices.bluetooth_handsfree_abstract_device.
42    BuetoothHandsfreeAbstractDeviceFactory.
43    """
44    def setup_class(self):
45
46        super().setup_class()
47        self.dut = self.android_devices[0]
48        req_params = ['audio_params', 'music_files']
49        #'audio_params' is a dict, contains the audio device type, audio streaming
50        #settings such as volumn, duration, audio recording parameters such as
51        #channel, sampling rate/width, and thdn parameters for audio processing
52        self.unpack_userparams(req_params)
53        # Find music file and push it to the dut
54        music_src = self.music_files[0]
55        music_dest = PHONE_MUSIC_FILE_DIRECTORY
56        success = self.dut.push_system_file(music_src, music_dest)
57        if success:
58            self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
59                                           os.path.basename(music_src))
60        # Initialize media_control class
61        self.media = btutils.MediaControlOverSl4a(self.dut, self.music_file)
62        # Set attenuator to minimum attenuation
63        if hasattr(self, 'attenuators'):
64            self.attenuator = self.attenuators[0]
65            self.attenuator.set_atten(INIT_ATTEN)
66        # Create the BTOE(Bluetooth-Other-End) device object
67        bt_devices = self.user_params.get('bt_devices', [])
68        if bt_devices:
69            attr, idx = bt_devices.split(':')
70            self.bt_device_controller = getattr(self, attr)[int(idx)]
71            self.bt_device = bt_factory().generate(self.bt_device_controller)
72        else:
73            self.log.error('No BT devices config is provided!')
74
75    def teardown_class(self):
76
77        super().teardown_class()
78        if hasattr(self, 'media'):
79            self.media.stop()
80        if hasattr(self, 'attenuator'):
81            self.attenuator.set_atten(INIT_ATTEN)
82        self.dut.droid.bluetoothFactoryReset()
83        self.bt_device.reset()
84        self.bt_device.power_off()
85        btutils.disable_bluetooth(self.dut.droid)
86
87    def setup_test(self):
88
89        super().setup_test()
90        # Initialize audio capture devices
91        self.audio_device = atu.get_audio_capture_device(
92            self.bt_device_controller, self.audio_params)
93        # Reset BT to factory defaults
94        self.dut.droid.bluetoothFactoryReset()
95        self.bt_device.reset()
96        self.bt_device.power_on()
97        btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
98        btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
99        vol = self.dut.droid.getMaxMediaVolume() * self.audio_params['volume']
100        self.dut.droid.setMediaVolume(0)
101        time.sleep(1)
102        self.dut.droid.setMediaVolume(int(vol))
103
104    def teardown_test(self):
105
106        super().teardown_test()
107        self.dut.droid.bluetoothFactoryReset()
108        self.media.stop()
109        # Set Attenuator to the initial attenuation
110        if hasattr(self, 'attenuator'):
111            self.attenuator.set_atten(INIT_ATTEN)
112        self.bt_device.reset()
113        self.bt_device.power_off()
114        btutils.disable_bluetooth(self.dut.droid)
115
116    def play_and_record_audio(self, duration):
117        """Play and record audio for a set duration.
118
119        Args:
120            duration: duration in seconds for music playing
121        Returns:
122            audio_captured: captured audio file path
123        """
124
125        self.log.info('Play and record audio for {} second'.format(duration))
126        self.media.play()
127        proc = self.audio_device.start()
128        time.sleep(duration + WAIT_TIME)
129        proc.kill()
130        time.sleep(WAIT_TIME)
131        proc.kill()
132        audio_captured = self.audio_device.stop()
133        self.media.stop()
134        self.log.info('Audio play and record stopped')
135        asserts.assert_true(audio_captured, 'Audio not recorded')
136        return audio_captured
137
138    def _get_bt_link_metrics(self):
139        """Get bt link metrics such as rssi and tx pwls.
140
141        Returns:
142            rssi_master: master rssi
143            pwl_master: master tx pwl
144            rssi_slave: slave rssi
145        """
146
147        self.media.play()
148        # Get master rssi and power level
149        rssi_master = btutils.get_bt_metric(self.dut)['rssi']
150        pwl_master = btutils.get_bt_metric(self.dut)['pwlv']
151        # Get slave rssi if possible
152        if isinstance(self.bt_device_controller,
153                      acts.controllers.android_device.AndroidDevice):
154            rssi_slave = btutils.get_bt_rssi(self.bt_device_controller)
155        else:
156            rssi_slave = None
157        self.media.stop()
158        return [rssi_master, pwl_master, rssi_slave]
159
160    def run_thdn_analysis(self, audio_captured, tag):
161        """Calculate Total Harmonic Distortion plus Noise for latest recording.
162
163        Store result in self.metrics.
164
165        Args:
166            audio_captured: the captured audio file
167        Returns:
168            thdn: thdn value in a list
169        """
170        # Calculate Total Harmonic Distortion + Noise
171        audio_result = atu.AudioCaptureResult(audio_captured,
172                                              self.audio_params)
173        thdn = audio_result.THDN(**self.audio_params['thdn_params'])
174        file_name = tag + os.path.basename(audio_result.path)
175        file_new = os.path.join(os.path.dirname(audio_result.path), file_name)
176        shutil.copyfile(audio_result.path, file_new)
177        for ch_no, t in enumerate(thdn):
178            self.log.info('THD+N for channel %s: %.4f%%' % (ch_no, t * 100))
179        return thdn
180
181    def run_anomaly_detection(self, audio_captured):
182        """Detect anomalies in latest recording.
183
184        Store result in self.metrics.
185
186        Args:
187            audio_captured: the captured audio file
188        Returns:
189            anom: anom detected in the captured file
190        """
191        # Detect Anomalies
192        audio_result = atu.AudioCaptureResult(audio_captured)
193        anom = audio_result.detect_anomalies(
194            **self.audio_params['anomaly_params'])
195        num_anom = 0
196        for ch_no, anomalies in enumerate(anom):
197            if anomalies:
198                for anomaly in anomalies:
199                    num_anom += 1
200                    start, end = anomaly
201                    self.log.warning(
202                        'Anomaly on channel {} at {}:{}. Duration '
203                        '{} sec'.format(ch_no, start // 60, start % 60,
204                                        end - start))
205        else:
206            self.log.info('%i anomalies detected.' % num_anom)
207        return anom
208