1# Lint as: python2, python3
2# Copyright (c) 2013 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import re
12import subprocess
13
14from autotest_lib.client.common_lib import error
15from autotest_lib.client.common_lib import utils
16from autotest_lib.client.cros.audio import cmd_utils
17from six.moves import range
18
19
20ACONNECT_PATH = '/usr/bin/aconnect'
21ARECORD_PATH = '/usr/bin/arecord'
22APLAY_PATH = '/usr/bin/aplay'
23AMIXER_PATH = '/usr/bin/amixer'
24CARD_NUM_RE = re.compile(r'(\d+) \[.*\]:')
25CLIENT_NUM_RE = re.compile(r'client (\d+):')
26DEV_NUM_RE = re.compile(r'.* \[.*\], device (\d+):')
27CONTROL_NAME_RE = re.compile(r"name='(.*)'")
28SCONTROL_NAME_RE = re.compile(r"Simple mixer control '(.*)'")
29AUDIO_DEVICE_STATUS_CMD = 'cat /proc/asound/card%s/pcm%sp/sub0/status'
30OUTPUT_DEVICE_CMD = 'cras_test_client --dump_audio_thread | grep "Output dev:"'
31
32CARD_PREF_RECORD_DEV_IDX = {
33    'bxtda7219max': 3,
34}
35
36def _get_format_args(channels, bits, rate):
37    args = ['-c', str(channels)]
38    args += ['-f', 'S%d_LE' % bits]
39    args += ['-r', str(rate)]
40    return args
41
42
43def get_num_soundcards():
44    '''Returns the number of soundcards.
45
46    Number of soundcards is parsed from /proc/asound/cards.
47    Sample content:
48
49      0 [PCH            ]: HDA-Intel - HDA Intel PCH
50                           HDA Intel PCH at 0xef340000 irq 103
51      1 [NVidia         ]: HDA-Intel - HDA NVidia
52                           HDA NVidia at 0xef080000 irq 36
53    '''
54
55    card_id = None
56    with open('/proc/asound/cards', 'r') as f:
57        for line in f:
58            match = CARD_NUM_RE.search(line)
59            if match:
60                card_id = int(match.group(1))
61    if card_id is None:
62        return 0
63    else:
64        return card_id + 1
65
66
67def _get_soundcard_controls(card_id):
68    '''Gets the controls for a soundcard.
69
70    @param card_id: Soundcard ID.
71    @raise RuntimeError: If failed to get soundcard controls.
72
73    Controls for a soundcard is retrieved by 'amixer controls' command.
74    amixer output format:
75
76      numid=32,iface=CARD,name='Front Headphone Jack'
77      numid=28,iface=CARD,name='Front Mic Jack'
78      numid=1,iface=CARD,name='HDMI/DP,pcm=3 Jack'
79      numid=8,iface=CARD,name='HDMI/DP,pcm=7 Jack'
80
81    Controls with iface=CARD are parsed from the output and returned in a set.
82    '''
83
84    cmd = [AMIXER_PATH, '-c', str(card_id), 'controls']
85    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
86    output, _ = p.communicate()
87    if p.wait() != 0:
88        raise RuntimeError('amixer command failed')
89
90    controls = set()
91    for line in output.splitlines():
92        if not 'iface=CARD' in line:
93            continue
94        match = CONTROL_NAME_RE.search(line)
95        if match:
96            controls.add(match.group(1))
97    return controls
98
99
100def _get_soundcard_scontrols(card_id):
101    '''Gets the simple mixer controls for a soundcard.
102
103    @param card_id: Soundcard ID.
104    @raise RuntimeError: If failed to get soundcard simple mixer controls.
105
106    # TODO b:169251326 terms below are set outside of this codebase
107    # and should be updated when possible. ("Master" -> "Main")
108    Simple mixer controls for a soundcard is retrieved by 'amixer scontrols'
109    command.  amixer output format:
110
111      Simple mixer control 'Master',0
112      Simple mixer control 'Headphone',0
113      Simple mixer control 'Speaker',0
114      Simple mixer control 'PCM',0
115
116    Simple controls are parsed from the output and returned in a set.
117    '''
118
119    cmd = [AMIXER_PATH, '-c', str(card_id), 'scontrols']
120    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
121    output, _ = p.communicate()
122    if p.wait() != 0:
123        raise RuntimeError('amixer command failed')
124
125    scontrols = set()
126    for line in output.splitlines():
127        match = SCONTROL_NAME_RE.findall(line)
128        if match:
129            scontrols.add(match[0])
130    return scontrols
131
132
133def get_first_soundcard_with_control(cname, scname):
134    '''Returns the soundcard ID with matching control name.
135
136    @param cname: Control name to look for.
137    @param scname: Simple control name to look for.
138    '''
139
140    cpat = re.compile(r'\b%s\b' % cname, re.IGNORECASE)
141    scpat = re.compile(r'\b%s\b' % scname, re.IGNORECASE)
142    for card_id in range(get_num_soundcards()):
143        for pat, func in [(cpat, _get_soundcard_controls),
144                          (scpat, _get_soundcard_scontrols)]:
145            if any(pat.search(c) for c in func(card_id)):
146                return card_id
147    return None
148
149
150def get_soundcard_names():
151    '''Returns a dictionary of card names, keyed by card number.'''
152
153    cmd = "alsa_helpers -l"
154    try:
155        output = utils.system_output(command=cmd, retain_output=True)
156    except error.CmdError:
157        raise RuntimeError('alsa_helpers -l failed to return card names')
158
159    return dict((index, name) for index, name in (
160        line.split(',') for line in output.splitlines()))
161
162
163def get_default_playback_device():
164    '''Gets the first playback device.
165
166    Returns the first playback device or None if it fails to find one.
167    '''
168
169    card_id = get_first_soundcard_with_control(cname='Headphone Jack',
170                                               scname='Headphone')
171    if card_id is None:
172        return None
173    return 'plughw:%d' % card_id
174
175def get_record_card_name(card_idx):
176    '''Gets the recording sound card name for given card idx.
177
178    Returns the card name inside the square brackets of arecord output lines.
179    '''
180    card_name_re = re.compile(r'card %d: .*?\[(.*?)\]' % card_idx)
181    cmd = [ARECORD_PATH, '-l']
182    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
183    output, _ = p.communicate()
184    if p.wait() != 0:
185        raise RuntimeError('arecord -l command failed')
186
187    for line in output.splitlines():
188        match = card_name_re.search(line)
189        if match:
190            return match.group(1)
191    return None
192
193
194def get_record_device_supported_channels(device):
195    '''Gets the supported channels for the record device.
196
197    @param device: The device to record the audio. E.g. hw:0,1
198
199    Returns the supported values in integer in a list for the device.
200    If the value doesn't exist or the command fails, return None.
201    '''
202    cmd = "alsa_helpers --device %s --get_capture_channels" % device
203    try:
204        output = utils.system_output(command=cmd, retain_output=True)
205    except error.CmdError:
206        logging.error("Fail to get supported channels for %s", device)
207        return None
208
209    supported_channels = output.splitlines()
210    if not supported_channels:
211        logging.error("Supported channels are empty for %s", device)
212        return None
213    return [int(i) for i in supported_channels]
214
215
216def get_default_record_device():
217    '''Gets the first record device.
218
219    Returns the first record device or None if it fails to find one.
220    '''
221
222    card_id = get_first_soundcard_with_control(cname='Mic Jack', scname='Mic')
223    if card_id is None:
224        return None
225
226    card_name = get_record_card_name(card_id)
227    if card_name in CARD_PREF_RECORD_DEV_IDX:
228        return 'plughw:%d,%d' % (card_id, CARD_PREF_RECORD_DEV_IDX[card_name])
229
230    # Get first device id of this card.
231    cmd = [ARECORD_PATH, '-l']
232    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
233    output, _ = p.communicate()
234    if p.wait() != 0:
235        raise RuntimeError('arecord -l command failed')
236
237    dev_id = 0
238    for line in output.splitlines():
239        if 'card %d:' % card_id in line:
240            match = DEV_NUM_RE.search(line)
241            if match:
242                dev_id = int(match.group(1))
243                break
244    return 'plughw:%d,%d' % (card_id, dev_id)
245
246
247def _get_sysdefault(cmd):
248    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
249    output, _ = p.communicate()
250    if p.wait() != 0:
251        raise RuntimeError('%s failed' % cmd)
252
253    for line in output.splitlines():
254        if 'sysdefault' in line:
255            return line
256    return None
257
258
259def get_sysdefault_playback_device():
260    '''Gets the sysdefault device from aplay -L output.'''
261
262    return _get_sysdefault([APLAY_PATH, '-L'])
263
264
265def get_sysdefault_record_device():
266    '''Gets the sysdefault device from arecord -L output.'''
267
268    return _get_sysdefault([ARECORD_PATH, '-L'])
269
270
271def playback(*args, **kwargs):
272    '''A helper funciton to execute playback_cmd.
273
274    @param kwargs: kwargs passed to playback_cmd.
275    '''
276    cmd_utils.execute(playback_cmd(*args, **kwargs))
277
278
279def playback_cmd(
280        input, duration=None, channels=2, bits=16, rate=48000, device=None):
281    '''Plays the given input audio by the ALSA utility: 'aplay'.
282
283    @param input: The input audio to be played.
284    @param duration: The length of the playback (in seconds).
285    @param channels: The number of channels of the input audio.
286    @param bits: The number of bits of each audio sample.
287    @param rate: The sampling rate.
288    @param device: The device to play the audio on. E.g. hw:0,1
289    @raise RuntimeError: If no playback device is available.
290    '''
291    args = [APLAY_PATH]
292    if duration is not None:
293        args += ['-d', str(duration)]
294    args += _get_format_args(channels, bits, rate)
295    if device is None:
296        device = get_default_playback_device()
297        if device is None:
298            raise RuntimeError('no playback device')
299    else:
300        device = "plug%s" % device
301    args += ['-D', device]
302    args += [input]
303    return args
304
305
306def record(*args, **kwargs):
307    '''A helper function to execute record_cmd.
308
309    @param kwargs: kwargs passed to record_cmd.
310    '''
311    cmd_utils.execute(record_cmd(*args, **kwargs))
312
313
314def record_cmd(
315        output, duration=None, channels=1, bits=16, rate=48000, device=None):
316    '''Records the audio to the specified output by ALSA utility: 'arecord'.
317
318    @param output: The filename where the recorded audio will be stored to.
319    @param duration: The length of the recording (in seconds).
320    @param channels: The number of channels of the recorded audio.
321    @param bits: The number of bits of each audio sample.
322    @param rate: The sampling rate.
323    @param device: The device used to recorded the audio from. E.g. hw:0,1
324    @raise RuntimeError: If no record device is available.
325    '''
326    args = [ARECORD_PATH]
327    if duration is not None:
328        args += ['-d', str(duration)]
329    args += _get_format_args(channels, bits, rate)
330    if device is None:
331        device = get_default_record_device()
332        if device is None:
333            raise RuntimeError('no record device')
334    else:
335        device = "plug%s" % device
336    args += ['-D', device]
337    args += [output]
338    return args
339
340
341def mixer_cmd(card_id, cmd):
342    '''Executes amixer command.
343
344    @param card_id: Soundcard ID.
345    @param cmd: Amixer command to execute.
346    @raise RuntimeError: If failed to execute command.
347
348    Amixer command like ['set', 'PCM', '2dB+'] with card_id 1 will be executed
349    as:
350        amixer -c 1 set PCM 2dB+
351
352    Command output will be returned if any.
353    '''
354
355    cmd = [AMIXER_PATH, '-c', str(card_id)] + cmd
356    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
357    output, _ = p.communicate()
358    if p.wait() != 0:
359        raise RuntimeError('amixer command failed')
360    return output
361
362
363def get_num_seq_clients():
364    '''Returns the number of seq clients.
365
366    The number of clients is parsed from aconnect -io.
367    This is run as the chronos user to catch permissions problems.
368    Sample content:
369
370      client 0: 'System' [type=kernel]
371          0 'Timer           '
372          1 'Announce        '
373      client 14: 'Midi Through' [type=kernel]
374          0 'Midi Through Port-0'
375
376    @raise RuntimeError: If no seq device is available.
377    '''
378    cmd = [ACONNECT_PATH, '-io']
379    output = cmd_utils.execute(cmd, stdout=subprocess.PIPE, run_as='chronos')
380    num_clients = 0
381    for line in output.splitlines():
382        match = CLIENT_NUM_RE.match(line)
383        if match:
384            num_clients += 1
385    return num_clients
386
387def convert_device_name(cras_device_name):
388    '''Converts cras device name to alsa device name.
389
390    @returns: alsa device name that can be passed to aplay -D or arecord -D.
391              For example, if cras_device_name is "kbl_r5514_5663_max: :0,1",
392              this function will return "hw:0,1".
393    '''
394    tokens = cras_device_name.split(":")
395    return "hw:%s" % tokens[2]
396
397def check_audio_stream_at_selected_device(device_name, device_type):
398    """Checks the audio output at expected node
399
400    @param device_name: Audio output device name, Ex: kbl_r5514_5663_max: :0,1
401    @param device_type: Audio output device type, Ex: INTERNAL_SPEAKER
402    """
403    if device_type == 'BLUETOOTH':
404        output_device_output = utils.system_output(OUTPUT_DEVICE_CMD).strip()
405        bt_device = output_device_output.split('Output dev:')[1].strip()
406        if bt_device != device_name:
407            raise error.TestFail("Audio is not routing through expected node")
408        logging.info('Audio is routing through %s', bt_device)
409    else:
410        card_device_search = re.search(r':(\d),(\d)', device_name)
411        if card_device_search:
412            card_num = card_device_search.group(1)
413            device_num = card_device_search.group(2)
414        logging.debug("Sound card number is %s", card_num)
415        logging.debug("Device number is %s", device_num)
416        if card_num is None or device_num is None:
417            raise error.TestError("Audio device name is not in expected format")
418        device_status_output = utils.system_output(AUDIO_DEVICE_STATUS_CMD %
419                                                   (card_num, device_num))
420        logging.debug("Selected output device status is %s",
421                      device_status_output)
422
423        if 'RUNNING' in device_status_output:
424            logging.info("Audio is routing through expected node!")
425        elif 'closed' in device_status_output:
426            raise error.TestFail("Audio is not routing through expected audio "
427                                 "node!")
428        else:
429            raise error.TestError("Audio routing error! Device may be "
430                                  "preparing")