1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import random
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros import power_cycle_usb_util
12from autotest_lib.client.common_lib.cros.cfm import cras_node_collector
13from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
14from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
15from autotest_lib.server.cros.cfm import cfm_base_test
16
17
18# CFMs have a base volume level threshold. Setting the level below 2
19# is interpreted by the CFM as 0.
20CFM_VOLUME_LEVEL_LOWER_LIMIT = 2
21CFM_VOLUME_LEVEL_UPPER_LIMIT = 100
22JABRA = cfm_usb_devices.JABRA_SPEAK_410
23DUAL_SPEAKER_DEVICE_NAME = JABRA.product
24TIMEOUT_SECS  = 10
25
26
27class enterprise_CFM_DualSpeaker(cfm_base_test.CfmBaseTest):
28    """
29    Tests that the following functionality works on CfM enrolled devices:
30
31       1. Mixer mute/umute state should be in sync between CfM and 2 speakers.
32       2. Volume of two speakers should be in sync with the volume set by CfM.
33       3. When muting/unmuting speakers from CfM, #1 still holds.
34       4. When changing volume from CfM, #2 still holds.
35       5. After disconnect/re-connecting any speaker #1-4 still holds.
36    """
37    version = 1
38
39
40    def _get_cras_jabra_speaker_nodes(self):
41        """
42        Gets jabra speaker nodes.
43
44        @returns A list cras input nodes representing Jabra speakers.
45        """
46        nodes = self.cras_collector.get_output_nodes()
47        return [n for n in nodes if
48                DUAL_SPEAKER_DEVICE_NAME in n.device_name]
49
50    def _get_amixer_jabra_mic_node_ids(self):
51        """
52        Gets jabra mixer (microphone) card IDs from arecord.
53
54        @returns A list of mixer card IDs or [] if no mixer is found.
55        """
56        cmd = ("arecord -l"
57               " | grep \"%s\""
58               " | awk -v N=2 '{print $N}'" % DUAL_SPEAKER_DEVICE_NAME)
59        mixer_cards = [s.strip().split(':')[0] for s in
60                       self._host.run_output(cmd).splitlines()]
61        return mixer_cards
62
63    def _get_cras_default_speakers(self):
64        """
65        Gets the default speakers from cras_test_client.
66
67        @returns A list of speaker nodes.
68        """
69        nodes = self.cras_collector.get_output_nodes()
70        return [n for n in nodes if '*(default)' in n.node_name]
71
72    def _get_cras_default_mixers(self):
73        """
74        Gets the default mixers from cras_test_client.
75
76        @returns A list of speaker node IDs or [] if no device is found.
77        """
78        nodes = self.cras_collector.get_input_nodes()
79        return [n for n in nodes if n.node_name.startswith('*')]
80
81    def _get_cras_jabra_mixer_nodes(self):
82        """
83        Gets the mixer nodes from cras_test_client.
84
85        @returns A list of mixer node IDs or [] if no device is found.
86        """
87        nodes = self.cras_collector.get_input_nodes()
88        return [n for n in nodes if DUAL_SPEAKER_DEVICE_NAME in n.node_name]
89
90    def _get_cras_speaker_volume(self, node_id):
91        """
92        Gets the speaker volume for a node from cras_test_client.
93
94        @param node_id: the node ID to query.
95        @returns the volume of speaker.
96        """
97        for node in self.cras_collector.get_output_nodes():
98            if node.node_id == node_id:
99                return node.volume
100
101    def _get_mixer_mute_state(self, node_id):
102        """
103        Gets the speaker mute state from cras_test_client.
104
105        @param node_id: the node to query.
106        @returns True if speakers is muted, otherwise False.
107        """
108        cmd = ("amixer -c %s"
109               " | grep \"Mono: Capture\""
110               " | awk -v N=6 '{print $N}'" % node_id)
111        mute = [s.strip() for s in
112                self._host.run_output(cmd).splitlines()][0]
113
114        if re.search(r"\[(off)\]", mute):
115            return  True
116        else:
117            return False
118
119
120    def _test_volume_sync_for_dual_speakers(self):
121        """Checks whether volume is synced between dual speakers and CfM."""
122        cfm_volume = self.cfm_facade.get_speaker_volume()
123        # There is case where cfm_facade.get_speaker_volume() returns empty
124        # string. Script polls volume from App up to 15 seconds or until
125        # non-zero value is returned.
126        poll_time = 0
127        while not cfm_volume and poll_time < TIMEOUT_SECS:
128            cfm_volume = self.cfm_facade.get_speaker_volume()
129            time.sleep(1)
130            logging.info('Checking volume set by App on CfM: %s', cfm_volume)
131            poll_time += 1
132        if not cfm_volume:
133            logging.info('Volume returned from App is Null')
134            return False
135        nodes = self._get_cras_default_speakers()
136        for node in nodes:
137            cras_volume =  self._get_cras_speaker_volume(node.node_id)
138            logging.info('Volume in CfM and cras are sync for '
139                    'node %s? cfm: %s, cras: %s',
140                     str(node), cfm_volume, cras_volume)
141            if int(cfm_volume) != int(cras_volume):
142                logging.error('Test _test_volume_sync_for_dual_speakers fails'
143                              ' for node %s', node.device_name)
144                return False
145        return True
146
147
148    def _test_mute_state_sync_for_dual_speakers(self):
149        """Checks whether mute/unmute is sync between dual speakers and CfM."""
150        cfm_mute = self.cfm_facade.is_mic_muted()
151        if cfm_mute:
152            logging.info('Mixer is muted from CfM.')
153        else:
154            logging.info('Mixer is not muted from CfM.')
155
156        nodes = self._get_amixer_jabra_mic_node_ids()
157        for node_id in nodes:
158            amixer_mute =  self._get_mixer_mute_state(node_id)
159            if amixer_mute:
160                logging.info('amixer shows mic is muted for node %s.', node_id)
161            else:
162                logging.info('amixer shows mix not muted for node %s', node_id)
163            if not cfm_mute == amixer_mute:
164                logging.error('Test _test_mute_state_sync_for_dual_speakers '
165                              'fails for node %s', node_id)
166                return False
167        return True
168
169
170    def _has_dual_speakers(self):
171        """
172        Checks if there are dual speakers connected to the DUT.
173
174        @returns True if there are dual speakers, false otherwise.
175        """
176        collector = usb_device_collector.UsbDeviceCollector(self._host)
177        speakers = collector.get_devices_by_spec(JABRA)
178        return len(speakers) == 2
179
180    def _set_preferred_speaker(self, speaker_name):
181        """Set preferred speaker to Dual speaker."""
182        logging.info('CfM sets preferred speaker to %s.', speaker_name)
183        self.cfm_facade.set_preferred_speaker(speaker_name)
184        time.sleep(TIMEOUT_SECS)
185        current_prefered_speaker = self.cfm_facade.get_preferred_speaker()
186        logging.info('Prefered speaker set to %s', current_prefered_speaker)
187        if speaker_name != current_prefered_speaker:
188            raise error.TestFail('Failed to set prefered speaker! '
189                                 'Expected %s, got %s',
190                                 speaker_name, current_prefered_speaker)
191
192
193    def _set_preferred_mixer(self, mixer_name):
194        """Set preferred mixer/microphone to Dual speaker."""
195        logging.info('CfM sets preferred mixer to %s.', mixer_name)
196        self.cfm_facade.set_preferred_mic(mixer_name)
197        time.sleep(TIMEOUT_SECS)
198        current_prefered_mixer = self.cfm_facade.get_preferred_speaker()
199        logging.info('Prefered mixer set to %s by CfM.', current_prefered_mixer)
200        if mixer_name != current_prefered_mixer:
201            raise error.TestFail('Failed to set prefered mixer! '
202                                 'Expected %s, got %s',
203                                 mixer_name, current_prefered_mixer)
204
205
206    def  _set_preferred_speaker_mixer(self):
207        """Sets preferred speaker and mixer to Dual speaker."""
208        self._set_preferred_speaker(DUAL_SPEAKER_DEVICE_NAME)
209        self._set_preferred_mixer(DUAL_SPEAKER_DEVICE_NAME)
210        time.sleep(TIMEOUT_SECS)
211        default_speaker_ids = set([
212            n.node_id for n in self._get_cras_default_speakers()])
213        cras_speaker_ids = set([
214            n.node_id for n in self._get_cras_jabra_speaker_nodes()])
215        if default_speaker_ids != cras_speaker_ids:
216            raise error.TestFail('Dual speakers not set to preferred speaker '
217                                 'dual=%s; cras=%s' % (default_speaker_ids,
218                                                       cras_speaker_ids))
219        default_mixer_ids = set([
220            n.node_id for n in self._get_cras_default_mixers()])
221        jabra_mixer_ids = set([
222            n.node_id for n in self._get_cras_jabra_mixer_nodes()])
223        if default_mixer_ids != jabra_mixer_ids:
224            raise error.TestFail('Dual mixs is not set to preferred speaker.')
225
226
227    def _test_dual_speaker_sanity(self):
228        """
229        Performs a speaker sanity check:
230            1. Checks whether volume is sync between dual speakers and CfM
231            2. Checks whether mute/unmute is sync between dual speakers and CfM.
232        @returns True if passed, otherwise False.
233        """
234        volume = self._test_volume_sync_for_dual_speakers()
235        mute = self._test_mute_state_sync_for_dual_speakers()
236        return volume and mute
237
238
239    def _test_mute_sync(self):
240        """
241        Mutes and unmutes speaker from CfM.
242        Check whether mute/unmute is sync between dual speakers and CfM.
243        @returns True if yes, else retrun False.
244        """
245        self.cfm_facade.mute_mic()
246        if not self.cfm_facade.is_mic_muted():
247            raise error.TestFail('CFM fails to mute mic')
248        time.sleep(TIMEOUT_SECS)
249        muted = self._test_mute_state_sync_for_dual_speakers()
250        self.cfm_facade.unmute_mic()
251        if self.cfm_facade.is_mic_muted():
252            raise error.TestFail('CFM fails to unmute mic')
253        time.sleep(TIMEOUT_SECS)
254        unmuted = self._test_mute_state_sync_for_dual_speakers()
255        return muted and unmuted
256
257    def _test_volume_sync(self):
258        """
259        Changes speaker volume from CfM.
260        Checks whether volume is sync between dual speakers and CfM.
261        @returns True if check succeeds, otherwise False.
262        """
263        test_volume = random.randrange(CFM_VOLUME_LEVEL_LOWER_LIMIT,
264                                       CFM_VOLUME_LEVEL_UPPER_LIMIT)
265        self.cfm_facade.set_speaker_volume(str(test_volume))
266        time.sleep(TIMEOUT_SECS)
267        return self._test_volume_sync_for_dual_speakers()
268
269
270    def run_once(self):
271        """Runs the test."""
272        logging.info('Sanity check and initilization:')
273        if not self._has_dual_speakers():
274            raise error.TestFail('No dual speakers found on DUT.')
275
276        self.cras_collector = cras_node_collector.CrasNodeCollector(self._host)
277
278        # Remove 'board:' prefix.
279        board_name = self._host.get_board().split(':')[1]
280        gpio_list = power_cycle_usb_util.get_target_all_gpio(
281            self._host, board_name, JABRA.vendor_id,
282            JABRA.product_id)
283        if len(set(gpio_list)) == 1:
284            raise error.TestFail('Speakers have to be tied to different GPIO.')
285
286        self.cfm_facade.wait_for_hangouts_telemetry_commands()
287
288        self._set_preferred_speaker_mixer()
289        logging.info('1. Check CfM and dual speakers have the same setting '
290                     'after joining meeting:')
291        self.cfm_facade.start_new_hangout_session('test_cfm_dual_speaker')
292        if not self._test_dual_speaker_sanity():
293            raise error.TestFail('Dual speaker Sanity verification fails.')
294
295        logging.info('1.1 Check CfM and dual microphones have the same '
296                     'setting for Mute/unmute:')
297        if not self._test_mute_sync():
298            raise error.TestFail('Dual speaker Mute-unmute test'
299                    ' verification fails')
300
301        logging.info('1.2 Check CfM and dual Speakers have same volume: ')
302        if not self._test_volume_sync():
303            raise error.TestFail('Dual speaker volume test verification fails')
304
305        for gpio in gpio_list:
306
307            logging.info('2. Check CfM and speakers have the same setting '
308                         'after flapping speaker:')
309            logging.info('Power cycle one of Speaker %s', gpio)
310            power_cycle_usb_util.power_cycle_usb_gpio(self._host,
311                    gpio, TIMEOUT_SECS)
312            time.sleep(TIMEOUT_SECS)
313
314            logging.info('2.1. Check CfM and dual speakers have same setting')
315            if not self._test_dual_speaker_sanity():
316                raise error.TestFail('Dual speaker Sanity verification'
317                        ' fails after disconnect/reconnect speaker')
318
319            logging.info('2.1.1. Check CfM and microphones have same setting '
320                         'for Mute/unmute:')
321            if not self._test_mute_sync():
322                raise error.TestFail(
323                    'Dual speaker Mute-unmute test verification fails after '
324                    'disconnect/reconnect speaker')
325
326            logging.info('2.1.2. Check CfM and dual Speakers have same volume:')
327            if not self._test_volume_sync():
328                raise error.TestFail('Dual speaker volume test verification'
329                        ' fails after disconnect/reconnect speaker')
330
331        self.cfm_facade.end_hangout_session()
332