1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import subprocess
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib import utils
11from autotest_lib.client.cros.audio import cmd_utils
12
13
14ACONNECT_PATH = '/usr/bin/aconnect'
15ARECORD_PATH = '/usr/bin/arecord'
16APLAY_PATH = '/usr/bin/aplay'
17AMIXER_PATH = '/usr/bin/amixer'
18CARD_NUM_RE = re.compile(r'(\d+) \[.*\]:')
19CLIENT_NUM_RE = re.compile(r'client (\d+):')
20DEV_NUM_RE = re.compile(r'.* \[.*\], device (\d+):')
21CONTROL_NAME_RE = re.compile(r"name='(.*)'")
22SCONTROL_NAME_RE = re.compile(r"Simple mixer control '(.*)'")
23AUDIO_DEVICE_STATUS_CMD = 'cat /proc/asound/card%s/pcm%sp/sub0/status'
24OUTPUT_DEVICE_CMD = 'cras_test_client --dump_audio_thread | grep "Output dev:"'
25
26CARD_PREF_RECORD_DEV_IDX = {
27    'bxtda7219max': 3,
28}
29
30def _get_format_args(channels, bits, rate):
31    args = ['-c', str(channels)]
32    args += ['-f', 'S%d_LE' % bits]
33    args += ['-r', str(rate)]
34    return args
35
36
37def get_num_soundcards():
38    '''Returns the number of soundcards.
39
40    Number of soundcards is parsed from /proc/asound/cards.
41    Sample content:
42
43      0 [PCH            ]: HDA-Intel - HDA Intel PCH
44                           HDA Intel PCH at 0xef340000 irq 103
45      1 [NVidia         ]: HDA-Intel - HDA NVidia
46                           HDA NVidia at 0xef080000 irq 36
47    '''
48
49    card_id = None
50    with open('/proc/asound/cards', 'r') as f:
51        for line in f:
52            match = CARD_NUM_RE.search(line)
53            if match:
54                card_id = int(match.group(1))
55    if card_id is None:
56        return 0
57    else:
58        return card_id + 1
59
60
61def _get_soundcard_controls(card_id):
62    '''Gets the controls for a soundcard.
63
64    @param card_id: Soundcard ID.
65    @raise RuntimeError: If failed to get soundcard controls.
66
67    Controls for a soundcard is retrieved by 'amixer controls' command.
68    amixer output format:
69
70      numid=32,iface=CARD,name='Front Headphone Jack'
71      numid=28,iface=CARD,name='Front Mic Jack'
72      numid=1,iface=CARD,name='HDMI/DP,pcm=3 Jack'
73      numid=8,iface=CARD,name='HDMI/DP,pcm=7 Jack'
74
75    Controls with iface=CARD are parsed from the output and returned in a set.
76    '''
77
78    cmd = [AMIXER_PATH, '-c', str(card_id), 'controls']
79    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
80    output, _ = p.communicate()
81    if p.wait() != 0:
82        raise RuntimeError('amixer command failed')
83
84    controls = set()
85    for line in output.splitlines():
86        if not 'iface=CARD' in line:
87            continue
88        match = CONTROL_NAME_RE.search(line)
89        if match:
90            controls.add(match.group(1))
91    return controls
92
93
94def _get_soundcard_scontrols(card_id):
95    '''Gets the simple mixer controls for a soundcard.
96
97    @param card_id: Soundcard ID.
98    @raise RuntimeError: If failed to get soundcard simple mixer controls.
99
100    Simple mixer controls for a soundcard is retrieved by 'amixer scontrols'
101    command.  amixer output format:
102
103      Simple mixer control 'Master',0
104      Simple mixer control 'Headphone',0
105      Simple mixer control 'Speaker',0
106      Simple mixer control 'PCM',0
107
108    Simple controls are parsed from the output and returned in a set.
109    '''
110
111    cmd = [AMIXER_PATH, '-c', str(card_id), 'scontrols']
112    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
113    output, _ = p.communicate()
114    if p.wait() != 0:
115        raise RuntimeError('amixer command failed')
116
117    scontrols = set()
118    for line in output.splitlines():
119        match = SCONTROL_NAME_RE.findall(line)
120        if match:
121            scontrols.add(match[0])
122    return scontrols
123
124
125def get_first_soundcard_with_control(cname, scname):
126    '''Returns the soundcard ID with matching control name.
127
128    @param cname: Control name to look for.
129    @param scname: Simple control name to look for.
130    '''
131
132    cpat = re.compile(r'\b%s\b' % cname, re.IGNORECASE)
133    scpat = re.compile(r'\b%s\b' % scname, re.IGNORECASE)
134    for card_id in xrange(get_num_soundcards()):
135        for pat, func in [(cpat, _get_soundcard_controls),
136                          (scpat, _get_soundcard_scontrols)]:
137            if any(pat.search(c) for c in func(card_id)):
138                return card_id
139    return None
140
141
142def get_soundcard_names():
143    '''Returns a dictionary of card names, keyed by card number.'''
144
145    cmd = "alsa_helpers -l"
146    try:
147        output = utils.system_output(command=cmd, retain_output=True)
148    except error.CmdError:
149        raise RuntimeError('alsa_helpers -l failed to return card names')
150
151    return dict((index, name) for index, name in (
152        line.split(',') for line in output.splitlines()))
153
154
155def get_default_playback_device():
156    '''Gets the first playback device.
157
158    Returns the first playback device or None if it fails to find one.
159    '''
160
161    card_id = get_first_soundcard_with_control(cname='Headphone Jack',
162                                               scname='Headphone')
163    if card_id is None:
164        return None
165    return 'plughw:%d' % card_id
166
167def get_record_card_name(card_idx):
168    '''Gets the recording sound card name for given card idx.
169
170    Returns the card name inside the square brackets of arecord output lines.
171    '''
172    card_name_re = re.compile(r'card %d: .*?\[(.*?)\]' % card_idx)
173    cmd = [ARECORD_PATH, '-l']
174    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
175    output, _ = p.communicate()
176    if p.wait() != 0:
177        raise RuntimeError('arecord -l command failed')
178
179    for line in output.splitlines():
180        match = card_name_re.search(line)
181        if match:
182            return match.group(1)
183    return None
184
185
186def get_record_device_supported_channels(device):
187    '''Gets the supported channels for the record device.
188
189    @param device: The device to record the audio. E.g. hw:0,1
190
191    Returns the supported values in integer in a list for the device.
192    If the value doesn't exist or the command fails, return None.
193    '''
194    cmd = "alsa_helpers --device %s --get_capture_channels" % device
195    try:
196        output = utils.system_output(command=cmd, retain_output=True)
197    except error.CmdError:
198        logging.error("Fail to get supported channels for %s", device)
199        return None
200
201    supported_channels = output.splitlines()
202    if not supported_channels:
203        logging.error("Supported channels are empty for %s", device)
204        return None
205    return [int(i) for i in supported_channels]
206
207
208def get_default_record_device():
209    '''Gets the first record device.
210
211    Returns the first record device or None if it fails to find one.
212    '''
213
214    card_id = get_first_soundcard_with_control(cname='Mic Jack', scname='Mic')
215    if card_id is None:
216        return None
217
218    card_name = get_record_card_name(card_id)
219    if CARD_PREF_RECORD_DEV_IDX.has_key(card_name):
220        return 'plughw:%d,%d' % (card_id, CARD_PREF_RECORD_DEV_IDX[card_name])
221
222    # Get first device id of this card.
223    cmd = [ARECORD_PATH, '-l']
224    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
225    output, _ = p.communicate()
226    if p.wait() != 0:
227        raise RuntimeError('arecord -l command failed')
228
229    dev_id = 0
230    for line in output.splitlines():
231        if 'card %d:' % card_id in line:
232            match = DEV_NUM_RE.search(line)
233            if match:
234                dev_id = int(match.group(1))
235                break
236    return 'plughw:%d,%d' % (card_id, dev_id)
237
238
239def _get_sysdefault(cmd):
240    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
241    output, _ = p.communicate()
242    if p.wait() != 0:
243        raise RuntimeError('%s failed' % cmd)
244
245    for line in output.splitlines():
246        if 'sysdefault' in line:
247            return line
248    return None
249
250
251def get_sysdefault_playback_device():
252    '''Gets the sysdefault device from aplay -L output.'''
253
254    return _get_sysdefault([APLAY_PATH, '-L'])
255
256
257def get_sysdefault_record_device():
258    '''Gets the sysdefault device from arecord -L output.'''
259
260    return _get_sysdefault([ARECORD_PATH, '-L'])
261
262
263def playback(*args, **kwargs):
264    '''A helper funciton to execute playback_cmd.
265
266    @param kwargs: kwargs passed to playback_cmd.
267    '''
268    cmd_utils.execute(playback_cmd(*args, **kwargs))
269
270
271def playback_cmd(
272        input, duration=None, channels=2, bits=16, rate=48000, device=None):
273    '''Plays the given input audio by the ALSA utility: 'aplay'.
274
275    @param input: The input audio to be played.
276    @param duration: The length of the playback (in seconds).
277    @param channels: The number of channels of the input audio.
278    @param bits: The number of bits of each audio sample.
279    @param rate: The sampling rate.
280    @param device: The device to play the audio on. E.g. hw:0,1
281    @raise RuntimeError: If no playback device is available.
282    '''
283    args = [APLAY_PATH]
284    if duration is not None:
285        args += ['-d', str(duration)]
286    args += _get_format_args(channels, bits, rate)
287    if device is None:
288        device = get_default_playback_device()
289        if device is None:
290            raise RuntimeError('no playback device')
291    else:
292        device = "plug%s" % device
293    args += ['-D', device]
294    args += [input]
295    return args
296
297
298def record(*args, **kwargs):
299    '''A helper function to execute record_cmd.
300
301    @param kwargs: kwargs passed to record_cmd.
302    '''
303    cmd_utils.execute(record_cmd(*args, **kwargs))
304
305
306def record_cmd(
307        output, duration=None, channels=1, bits=16, rate=48000, device=None):
308    '''Records the audio to the specified output by ALSA utility: 'arecord'.
309
310    @param output: The filename where the recorded audio will be stored to.
311    @param duration: The length of the recording (in seconds).
312    @param channels: The number of channels of the recorded audio.
313    @param bits: The number of bits of each audio sample.
314    @param rate: The sampling rate.
315    @param device: The device used to recorded the audio from. E.g. hw:0,1
316    @raise RuntimeError: If no record device is available.
317    '''
318    args = [ARECORD_PATH]
319    if duration is not None:
320        args += ['-d', str(duration)]
321    args += _get_format_args(channels, bits, rate)
322    if device is None:
323        device = get_default_record_device()
324        if device is None:
325            raise RuntimeError('no record device')
326    else:
327        device = "plug%s" % device
328    args += ['-D', device]
329    args += [output]
330    return args
331
332
333def mixer_cmd(card_id, cmd):
334    '''Executes amixer command.
335
336    @param card_id: Soundcard ID.
337    @param cmd: Amixer command to execute.
338    @raise RuntimeError: If failed to execute command.
339
340    Amixer command like ['set', 'PCM', '2dB+'] with card_id 1 will be executed
341    as:
342        amixer -c 1 set PCM 2dB+
343
344    Command output will be returned if any.
345    '''
346
347    cmd = [AMIXER_PATH, '-c', str(card_id)] + cmd
348    p = cmd_utils.popen(cmd, stdout=subprocess.PIPE)
349    output, _ = p.communicate()
350    if p.wait() != 0:
351        raise RuntimeError('amixer command failed')
352    return output
353
354
355def get_num_seq_clients():
356    '''Returns the number of seq clients.
357
358    The number of clients is parsed from aconnect -io.
359    This is run as the chronos user to catch permissions problems.
360    Sample content:
361
362      client 0: 'System' [type=kernel]
363          0 'Timer           '
364          1 'Announce        '
365      client 14: 'Midi Through' [type=kernel]
366          0 'Midi Through Port-0'
367
368    @raise RuntimeError: If no seq device is available.
369    '''
370    cmd = [ACONNECT_PATH, '-io']
371    output = cmd_utils.execute(cmd, stdout=subprocess.PIPE, run_as='chronos')
372    num_clients = 0
373    for line in output.splitlines():
374        match = CLIENT_NUM_RE.match(line)
375        if match:
376            num_clients += 1
377    return num_clients
378
379def convert_device_name(cras_device_name):
380    '''Converts cras device name to alsa device name.
381
382    @returns: alsa device name that can be passed to aplay -D or arecord -D.
383              For example, if cras_device_name is "kbl_r5514_5663_max: :0,1",
384              this function will return "hw:0,1".
385    '''
386    tokens = cras_device_name.split(":")
387    return "hw:%s" % tokens[2]
388
389def check_audio_stream_at_selected_device(device_name, device_type):
390    """Checks the audio output at expected node
391
392    @param device_name: Audio output device name, Ex: kbl_r5514_5663_max: :0,1
393    @param device_type: Audio output device type, Ex: INTERNAL_SPEAKER
394    """
395    if device_type == 'BLUETOOTH':
396        output_device_output = utils.system_output(OUTPUT_DEVICE_CMD).strip()
397        bt_device = output_device_output.split('Output dev:')[1].strip()
398        if bt_device != device_name:
399            raise error.TestFail("Audio is not routing through expected node")
400        logging.info('Audio is routing through %s', bt_device)
401    else:
402        card_device_search = re.search(r':(\d),(\d)', device_name)
403        if card_device_search:
404            card_num = card_device_search.group(1)
405            device_num = card_device_search.group(2)
406        logging.debug("Sound card number is %s", card_num)
407        logging.debug("Device number is %s", device_num)
408        if card_num is None or device_num is None:
409            raise error.TestError("Audio device name is not in expected format")
410        device_status_output = utils.system_output(AUDIO_DEVICE_STATUS_CMD %
411                                                   (card_num, device_num))
412        logging.debug("Selected output device status is %s",
413                      device_status_output)
414
415        if 'RUNNING' in device_status_output:
416            logging.info("Audio is routing through expected node!")
417        elif 'closed' in device_status_output:
418            raise error.TestFail("Audio is not routing through expected audio "
419                                 "node!")
420        else:
421            raise error.TestError("Audio routing error! Device may be "
422                                  "preparing")