1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import random 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib.cros import power_cycle_usb_util 12from autotest_lib.client.common_lib.cros.cfm import cras_node_collector 13from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices 14from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector 15from autotest_lib.server.cros.cfm import cfm_base_test 16 17 18# CFMs have a base volume level threshold. Setting the level below 2 19# is interpreted by the CFM as 0. 20CFM_VOLUME_LEVEL_LOWER_LIMIT = 2 21CFM_VOLUME_LEVEL_UPPER_LIMIT = 100 22JABRA = cfm_usb_devices.JABRA_SPEAK_410 23DUAL_SPEAKER_DEVICE_NAME = JABRA.product 24TIMEOUT_SECS = 10 25 26 27class enterprise_CFM_DualSpeaker(cfm_base_test.CfmBaseTest): 28 """ 29 Tests that the following functionality works on CfM enrolled devices: 30 31 1. Mixer mute/umute state should be in sync between CfM and 2 speakers. 32 2. Volume of two speakers should be in sync with the volume set by CfM. 33 3. When muting/unmuting speakers from CfM, #1 still holds. 34 4. When changing volume from CfM, #2 still holds. 35 5. After disconnect/re-connecting any speaker #1-4 still holds. 36 """ 37 version = 1 38 39 40 def _get_cras_jabra_speaker_nodes(self): 41 """ 42 Gets jabra speaker nodes. 43 44 @returns A list cras input nodes representing Jabra speakers. 45 """ 46 nodes = self.cras_collector.get_output_nodes() 47 return [n for n in nodes if 48 DUAL_SPEAKER_DEVICE_NAME in n.device_name] 49 50 def _get_amixer_jabra_mic_node_ids(self): 51 """ 52 Gets jabra mixer (microphone) card IDs from arecord. 53 54 @returns A list of mixer card IDs or [] if no mixer is found. 55 """ 56 cmd = ("arecord -l" 57 " | grep \"%s\"" 58 " | awk -v N=2 '{print $N}'" % DUAL_SPEAKER_DEVICE_NAME) 59 mixer_cards = [s.strip().split(':')[0] for s in 60 self._host.run_output(cmd).splitlines()] 61 return mixer_cards 62 63 def _get_cras_default_speakers(self): 64 """ 65 Gets the default speakers from cras_test_client. 66 67 @returns A list of speaker nodes. 68 """ 69 nodes = self.cras_collector.get_output_nodes() 70 return [n for n in nodes if '*(default)' in n.node_name] 71 72 def _get_cras_default_mixers(self): 73 """ 74 Gets the default mixers from cras_test_client. 75 76 @returns A list of speaker node IDs or [] if no device is found. 77 """ 78 nodes = self.cras_collector.get_input_nodes() 79 return [n for n in nodes if n.node_name.startswith('*')] 80 81 def _get_cras_jabra_mixer_nodes(self): 82 """ 83 Gets the mixer nodes from cras_test_client. 84 85 @returns A list of mixer node IDs or [] if no device is found. 86 """ 87 nodes = self.cras_collector.get_input_nodes() 88 return [n for n in nodes if DUAL_SPEAKER_DEVICE_NAME in n.node_name] 89 90 def _get_cras_speaker_volume(self, node_id): 91 """ 92 Gets the speaker volume for a node from cras_test_client. 93 94 @param node_id: the node ID to query. 95 @returns the volume of speaker. 96 """ 97 for node in self.cras_collector.get_output_nodes(): 98 if node.node_id == node_id: 99 return node.volume 100 101 def _get_mixer_mute_state(self, node_id): 102 """ 103 Gets the speaker mute state from cras_test_client. 104 105 @param node_id: the node to query. 106 @returns True if speakers is muted, otherwise False. 107 """ 108 cmd = ("amixer -c %s" 109 " | grep \"Mono: Capture\"" 110 " | awk -v N=6 '{print $N}'" % node_id) 111 mute = [s.strip() for s in 112 self._host.run_output(cmd).splitlines()][0] 113 114 if re.search(r"\[(off)\]", mute): 115 return True 116 else: 117 return False 118 119 120 def _test_volume_sync_for_dual_speakers(self): 121 """Checks whether volume is synced between dual speakers and CfM.""" 122 cfm_volume = self.cfm_facade.get_speaker_volume() 123 # There is case where cfm_facade.get_speaker_volume() returns empty 124 # string. Script polls volume from App up to 15 seconds or until 125 # non-zero value is returned. 126 poll_time = 0 127 while not cfm_volume and poll_time < TIMEOUT_SECS: 128 cfm_volume = self.cfm_facade.get_speaker_volume() 129 time.sleep(1) 130 logging.info('Checking volume set by App on CfM: %s', cfm_volume) 131 poll_time += 1 132 if not cfm_volume: 133 logging.info('Volume returned from App is Null') 134 return False 135 nodes = self._get_cras_default_speakers() 136 for node in nodes: 137 cras_volume = self._get_cras_speaker_volume(node.node_id) 138 logging.info('Volume in CfM and cras are sync for ' 139 'node %s? cfm: %s, cras: %s', 140 str(node), cfm_volume, cras_volume) 141 if int(cfm_volume) != int(cras_volume): 142 logging.error('Test _test_volume_sync_for_dual_speakers fails' 143 ' for node %s', node.device_name) 144 return False 145 return True 146 147 148 def _test_mute_state_sync_for_dual_speakers(self): 149 """Checks whether mute/unmute is sync between dual speakers and CfM.""" 150 cfm_mute = self.cfm_facade.is_mic_muted() 151 if cfm_mute: 152 logging.info('Mixer is muted from CfM.') 153 else: 154 logging.info('Mixer is not muted from CfM.') 155 156 nodes = self._get_amixer_jabra_mic_node_ids() 157 for node_id in nodes: 158 amixer_mute = self._get_mixer_mute_state(node_id) 159 if amixer_mute: 160 logging.info('amixer shows mic is muted for node %s.', node_id) 161 else: 162 logging.info('amixer shows mix not muted for node %s', node_id) 163 if not cfm_mute == amixer_mute: 164 logging.error('Test _test_mute_state_sync_for_dual_speakers ' 165 'fails for node %s', node_id) 166 return False 167 return True 168 169 170 def _has_dual_speakers(self): 171 """ 172 Checks if there are dual speakers connected to the DUT. 173 174 @returns True if there are dual speakers, false otherwise. 175 """ 176 collector = usb_device_collector.UsbDeviceCollector(self._host) 177 speakers = collector.get_devices_by_spec(JABRA) 178 return len(speakers) == 2 179 180 def _set_preferred_speaker(self, speaker_name): 181 """Set preferred speaker to Dual speaker.""" 182 logging.info('CfM sets preferred speaker to %s.', speaker_name) 183 self.cfm_facade.set_preferred_speaker(speaker_name) 184 time.sleep(TIMEOUT_SECS) 185 current_prefered_speaker = self.cfm_facade.get_preferred_speaker() 186 logging.info('Prefered speaker set to %s', current_prefered_speaker) 187 if speaker_name != current_prefered_speaker: 188 raise error.TestFail('Failed to set prefered speaker! ' 189 'Expected %s, got %s', 190 speaker_name, current_prefered_speaker) 191 192 193 def _set_preferred_mixer(self, mixer_name): 194 """Set preferred mixer/microphone to Dual speaker.""" 195 logging.info('CfM sets preferred mixer to %s.', mixer_name) 196 self.cfm_facade.set_preferred_mic(mixer_name) 197 time.sleep(TIMEOUT_SECS) 198 current_prefered_mixer = self.cfm_facade.get_preferred_speaker() 199 logging.info('Prefered mixer set to %s by CfM.', current_prefered_mixer) 200 if mixer_name != current_prefered_mixer: 201 raise error.TestFail('Failed to set prefered mixer! ' 202 'Expected %s, got %s', 203 mixer_name, current_prefered_mixer) 204 205 206 def _set_preferred_speaker_mixer(self): 207 """Sets preferred speaker and mixer to Dual speaker.""" 208 self._set_preferred_speaker(DUAL_SPEAKER_DEVICE_NAME) 209 self._set_preferred_mixer(DUAL_SPEAKER_DEVICE_NAME) 210 time.sleep(TIMEOUT_SECS) 211 default_speaker_ids = set([ 212 n.node_id for n in self._get_cras_default_speakers()]) 213 cras_speaker_ids = set([ 214 n.node_id for n in self._get_cras_jabra_speaker_nodes()]) 215 if default_speaker_ids != cras_speaker_ids: 216 raise error.TestFail('Dual speakers not set to preferred speaker ' 217 'dual=%s; cras=%s' % (default_speaker_ids, 218 cras_speaker_ids)) 219 default_mixer_ids = set([ 220 n.node_id for n in self._get_cras_default_mixers()]) 221 jabra_mixer_ids = set([ 222 n.node_id for n in self._get_cras_jabra_mixer_nodes()]) 223 if default_mixer_ids != jabra_mixer_ids: 224 raise error.TestFail('Dual mixs is not set to preferred speaker.') 225 226 227 def _test_dual_speaker_sanity(self): 228 """ 229 Performs a speaker sanity check: 230 1. Checks whether volume is sync between dual speakers and CfM 231 2. Checks whether mute/unmute is sync between dual speakers and CfM. 232 @returns True if passed, otherwise False. 233 """ 234 volume = self._test_volume_sync_for_dual_speakers() 235 mute = self._test_mute_state_sync_for_dual_speakers() 236 return volume and mute 237 238 239 def _test_mute_sync(self): 240 """ 241 Mutes and unmutes speaker from CfM. 242 Check whether mute/unmute is sync between dual speakers and CfM. 243 @returns True if yes, else retrun False. 244 """ 245 self.cfm_facade.mute_mic() 246 if not self.cfm_facade.is_mic_muted(): 247 raise error.TestFail('CFM fails to mute mic') 248 time.sleep(TIMEOUT_SECS) 249 muted = self._test_mute_state_sync_for_dual_speakers() 250 self.cfm_facade.unmute_mic() 251 if self.cfm_facade.is_mic_muted(): 252 raise error.TestFail('CFM fails to unmute mic') 253 time.sleep(TIMEOUT_SECS) 254 unmuted = self._test_mute_state_sync_for_dual_speakers() 255 return muted and unmuted 256 257 def _test_volume_sync(self): 258 """ 259 Changes speaker volume from CfM. 260 Checks whether volume is sync between dual speakers and CfM. 261 @returns True if check succeeds, otherwise False. 262 """ 263 test_volume = random.randrange(CFM_VOLUME_LEVEL_LOWER_LIMIT, 264 CFM_VOLUME_LEVEL_UPPER_LIMIT) 265 self.cfm_facade.set_speaker_volume(str(test_volume)) 266 time.sleep(TIMEOUT_SECS) 267 return self._test_volume_sync_for_dual_speakers() 268 269 270 def run_once(self): 271 """Runs the test.""" 272 logging.info('Sanity check and initilization:') 273 if not self._has_dual_speakers(): 274 raise error.TestFail('No dual speakers found on DUT.') 275 276 self.cras_collector = cras_node_collector.CrasNodeCollector(self._host) 277 278 # Remove 'board:' prefix. 279 board_name = self._host.get_board().split(':')[1] 280 gpio_list = power_cycle_usb_util.get_target_all_gpio( 281 self._host, board_name, JABRA.vendor_id, 282 JABRA.product_id) 283 if len(set(gpio_list)) == 1: 284 raise error.TestFail('Speakers have to be tied to different GPIO.') 285 286 self.cfm_facade.wait_for_hangouts_telemetry_commands() 287 288 self._set_preferred_speaker_mixer() 289 logging.info('1. Check CfM and dual speakers have the same setting ' 290 'after joining meeting:') 291 self.cfm_facade.start_new_hangout_session('test_cfm_dual_speaker') 292 if not self._test_dual_speaker_sanity(): 293 raise error.TestFail('Dual speaker Sanity verification fails.') 294 295 logging.info('1.1 Check CfM and dual microphones have the same ' 296 'setting for Mute/unmute:') 297 if not self._test_mute_sync(): 298 raise error.TestFail('Dual speaker Mute-unmute test' 299 ' verification fails') 300 301 logging.info('1.2 Check CfM and dual Speakers have same volume: ') 302 if not self._test_volume_sync(): 303 raise error.TestFail('Dual speaker volume test verification fails') 304 305 for gpio in gpio_list: 306 307 logging.info('2. Check CfM and speakers have the same setting ' 308 'after flapping speaker:') 309 logging.info('Power cycle one of Speaker %s', gpio) 310 power_cycle_usb_util.power_cycle_usb_gpio(self._host, 311 gpio, TIMEOUT_SECS) 312 time.sleep(TIMEOUT_SECS) 313 314 logging.info('2.1. Check CfM and dual speakers have same setting') 315 if not self._test_dual_speaker_sanity(): 316 raise error.TestFail('Dual speaker Sanity verification' 317 ' fails after disconnect/reconnect speaker') 318 319 logging.info('2.1.1. Check CfM and microphones have same setting ' 320 'for Mute/unmute:') 321 if not self._test_mute_sync(): 322 raise error.TestFail( 323 'Dual speaker Mute-unmute test verification fails after ' 324 'disconnect/reconnect speaker') 325 326 logging.info('2.1.2. Check CfM and dual Speakers have same volume:') 327 if not self._test_volume_sync(): 328 raise error.TestFail('Dual speaker volume test verification' 329 ' fails after disconnect/reconnect speaker') 330 331 self.cfm_facade.end_hangout_session() 332