1# Lint as: python2, python3 2# Copyright (c) 2013 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import logging 11import re 12import subprocess 13 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.common_lib import utils 16from autotest_lib.client.cros.audio import cmd_utils 17from six.moves import range 18 19 20ACONNECT_PATH = '/usr/bin/aconnect' 21ARECORD_PATH = '/usr/bin/arecord' 22APLAY_PATH = '/usr/bin/aplay' 23AMIXER_PATH = '/usr/bin/amixer' 24CARD_NUM_RE = re.compile(r'(\d+) \[.*\]:') 25CLIENT_NUM_RE = re.compile(r'client (\d+):') 26DEV_NUM_RE = re.compile(r'.* \[.*\], device (\d+):') 27CONTROL_NAME_RE = re.compile(r"name='(.*)'") 28SCONTROL_NAME_RE = re.compile(r"Simple mixer control '(.*)'") 29AUDIO_DEVICE_STATUS_CMD = 'cat /proc/asound/card%s/pcm%sp/sub0/status' 30OUTPUT_DEVICE_CMD = 'cras_test_client --dump_audio_thread | grep "Output dev:"' 31 32CARD_PREF_RECORD_DEV_IDX = { 33 'bxtda7219max': 3, 34} 35 36def _get_format_args(channels, bits, rate): 37 args = ['-c', str(channels)] 38 args += ['-f', 'S%d_LE' % bits] 39 args += ['-r', str(rate)] 40 return args 41 42 43def get_num_soundcards(): 44 '''Returns the number of soundcards. 45 46 Number of soundcards is parsed from /proc/asound/cards. 47 Sample content: 48 49 0 [PCH ]: HDA-Intel - HDA Intel PCH 50 HDA Intel PCH at 0xef340000 irq 103 51 1 [NVidia ]: HDA-Intel - HDA NVidia 52 HDA NVidia at 0xef080000 irq 36 53 ''' 54 55 card_id = None 56 with open('/proc/asound/cards', 'r') as f: 57 for line in f: 58 match = CARD_NUM_RE.search(line) 59 if match: 60 card_id = int(match.group(1)) 61 if card_id is None: 62 return 0 63 else: 64 return card_id + 1 65 66 67def _get_soundcard_controls(card_id): 68 '''Gets the controls for a soundcard. 69 70 @param card_id: Soundcard ID. 71 @raise RuntimeError: If failed to get soundcard controls. 72 73 Controls for a soundcard is retrieved by 'amixer controls' command. 74 amixer output format: 75 76 numid=32,iface=CARD,name='Front Headphone Jack' 77 numid=28,iface=CARD,name='Front Mic Jack' 78 numid=1,iface=CARD,name='HDMI/DP,pcm=3 Jack' 79 numid=8,iface=CARD,name='HDMI/DP,pcm=7 Jack' 80 81 Controls with iface=CARD are parsed from the output and returned in a set. 82 ''' 83 84 cmd = [AMIXER_PATH, '-c', str(card_id), 'controls'] 85 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 86 output, _ = p.communicate() 87 if p.wait() != 0: 88 raise RuntimeError('amixer command failed') 89 90 controls = set() 91 for line in output.splitlines(): 92 if not 'iface=CARD' in line: 93 continue 94 match = CONTROL_NAME_RE.search(line) 95 if match: 96 controls.add(match.group(1)) 97 return controls 98 99 100def _get_soundcard_scontrols(card_id): 101 '''Gets the simple mixer controls for a soundcard. 102 103 @param card_id: Soundcard ID. 104 @raise RuntimeError: If failed to get soundcard simple mixer controls. 105 106 # TODO b:169251326 terms below are set outside of this codebase 107 # and should be updated when possible. ("Master" -> "Main") 108 Simple mixer controls for a soundcard is retrieved by 'amixer scontrols' 109 command. amixer output format: 110 111 Simple mixer control 'Master',0 112 Simple mixer control 'Headphone',0 113 Simple mixer control 'Speaker',0 114 Simple mixer control 'PCM',0 115 116 Simple controls are parsed from the output and returned in a set. 117 ''' 118 119 cmd = [AMIXER_PATH, '-c', str(card_id), 'scontrols'] 120 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 121 output, _ = p.communicate() 122 if p.wait() != 0: 123 raise RuntimeError('amixer command failed') 124 125 scontrols = set() 126 for line in output.splitlines(): 127 match = SCONTROL_NAME_RE.findall(line) 128 if match: 129 scontrols.add(match[0]) 130 return scontrols 131 132 133def get_first_soundcard_with_control(cname, scname): 134 '''Returns the soundcard ID with matching control name. 135 136 @param cname: Control name to look for. 137 @param scname: Simple control name to look for. 138 ''' 139 140 cpat = re.compile(r'\b%s\b' % cname, re.IGNORECASE) 141 scpat = re.compile(r'\b%s\b' % scname, re.IGNORECASE) 142 for card_id in range(get_num_soundcards()): 143 for pat, func in [(cpat, _get_soundcard_controls), 144 (scpat, _get_soundcard_scontrols)]: 145 if any(pat.search(c) for c in func(card_id)): 146 return card_id 147 return None 148 149 150def get_soundcard_names(): 151 '''Returns a dictionary of card names, keyed by card number.''' 152 153 cmd = "alsa_helpers -l" 154 try: 155 output = utils.system_output(command=cmd, retain_output=True) 156 except error.CmdError: 157 raise RuntimeError('alsa_helpers -l failed to return card names') 158 159 return dict((index, name) for index, name in ( 160 line.split(',') for line in output.splitlines())) 161 162 163def get_default_playback_device(): 164 '''Gets the first playback device. 165 166 Returns the first playback device or None if it fails to find one. 167 ''' 168 169 card_id = get_first_soundcard_with_control(cname='Headphone Jack', 170 scname='Headphone') 171 if card_id is None: 172 return None 173 return 'plughw:%d' % card_id 174 175def get_record_card_name(card_idx): 176 '''Gets the recording sound card name for given card idx. 177 178 Returns the card name inside the square brackets of arecord output lines. 179 ''' 180 card_name_re = re.compile(r'card %d: .*?\[(.*?)\]' % card_idx) 181 cmd = [ARECORD_PATH, '-l'] 182 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 183 output, _ = p.communicate() 184 if p.wait() != 0: 185 raise RuntimeError('arecord -l command failed') 186 187 for line in output.splitlines(): 188 match = card_name_re.search(line) 189 if match: 190 return match.group(1) 191 return None 192 193 194def get_record_device_supported_channels(device): 195 '''Gets the supported channels for the record device. 196 197 @param device: The device to record the audio. E.g. hw:0,1 198 199 Returns the supported values in integer in a list for the device. 200 If the value doesn't exist or the command fails, return None. 201 ''' 202 cmd = "alsa_helpers --device %s --get_capture_channels" % device 203 try: 204 output = utils.system_output(command=cmd, retain_output=True) 205 except error.CmdError: 206 logging.error("Fail to get supported channels for %s", device) 207 return None 208 209 supported_channels = output.splitlines() 210 if not supported_channels: 211 logging.error("Supported channels are empty for %s", device) 212 return None 213 return [int(i) for i in supported_channels] 214 215 216def get_default_record_device(): 217 '''Gets the first record device. 218 219 Returns the first record device or None if it fails to find one. 220 ''' 221 222 card_id = get_first_soundcard_with_control(cname='Mic Jack', scname='Mic') 223 if card_id is None: 224 return None 225 226 card_name = get_record_card_name(card_id) 227 if card_name in CARD_PREF_RECORD_DEV_IDX: 228 return 'plughw:%d,%d' % (card_id, CARD_PREF_RECORD_DEV_IDX[card_name]) 229 230 # Get first device id of this card. 231 cmd = [ARECORD_PATH, '-l'] 232 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 233 output, _ = p.communicate() 234 if p.wait() != 0: 235 raise RuntimeError('arecord -l command failed') 236 237 dev_id = 0 238 for line in output.splitlines(): 239 if 'card %d:' % card_id in line: 240 match = DEV_NUM_RE.search(line) 241 if match: 242 dev_id = int(match.group(1)) 243 break 244 return 'plughw:%d,%d' % (card_id, dev_id) 245 246 247def _get_sysdefault(cmd): 248 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 249 output, _ = p.communicate() 250 if p.wait() != 0: 251 raise RuntimeError('%s failed' % cmd) 252 253 for line in output.splitlines(): 254 if 'sysdefault' in line: 255 return line 256 return None 257 258 259def get_sysdefault_playback_device(): 260 '''Gets the sysdefault device from aplay -L output.''' 261 262 return _get_sysdefault([APLAY_PATH, '-L']) 263 264 265def get_sysdefault_record_device(): 266 '''Gets the sysdefault device from arecord -L output.''' 267 268 return _get_sysdefault([ARECORD_PATH, '-L']) 269 270 271def playback(*args, **kwargs): 272 '''A helper funciton to execute playback_cmd. 273 274 @param kwargs: kwargs passed to playback_cmd. 275 ''' 276 cmd_utils.execute(playback_cmd(*args, **kwargs)) 277 278 279def playback_cmd( 280 input, duration=None, channels=2, bits=16, rate=48000, device=None): 281 '''Plays the given input audio by the ALSA utility: 'aplay'. 282 283 @param input: The input audio to be played. 284 @param duration: The length of the playback (in seconds). 285 @param channels: The number of channels of the input audio. 286 @param bits: The number of bits of each audio sample. 287 @param rate: The sampling rate. 288 @param device: The device to play the audio on. E.g. hw:0,1 289 @raise RuntimeError: If no playback device is available. 290 ''' 291 args = [APLAY_PATH] 292 if duration is not None: 293 args += ['-d', str(duration)] 294 args += _get_format_args(channels, bits, rate) 295 if device is None: 296 device = get_default_playback_device() 297 if device is None: 298 raise RuntimeError('no playback device') 299 else: 300 device = "plug%s" % device 301 args += ['-D', device] 302 args += [input] 303 return args 304 305 306def record(*args, **kwargs): 307 '''A helper function to execute record_cmd. 308 309 @param kwargs: kwargs passed to record_cmd. 310 ''' 311 cmd_utils.execute(record_cmd(*args, **kwargs)) 312 313 314def record_cmd( 315 output, duration=None, channels=1, bits=16, rate=48000, device=None): 316 '''Records the audio to the specified output by ALSA utility: 'arecord'. 317 318 @param output: The filename where the recorded audio will be stored to. 319 @param duration: The length of the recording (in seconds). 320 @param channels: The number of channels of the recorded audio. 321 @param bits: The number of bits of each audio sample. 322 @param rate: The sampling rate. 323 @param device: The device used to recorded the audio from. E.g. hw:0,1 324 @raise RuntimeError: If no record device is available. 325 ''' 326 args = [ARECORD_PATH] 327 if duration is not None: 328 args += ['-d', str(duration)] 329 args += _get_format_args(channels, bits, rate) 330 if device is None: 331 device = get_default_record_device() 332 if device is None: 333 raise RuntimeError('no record device') 334 else: 335 device = "plug%s" % device 336 args += ['-D', device] 337 args += [output] 338 return args 339 340 341def mixer_cmd(card_id, cmd): 342 '''Executes amixer command. 343 344 @param card_id: Soundcard ID. 345 @param cmd: Amixer command to execute. 346 @raise RuntimeError: If failed to execute command. 347 348 Amixer command like ['set', 'PCM', '2dB+'] with card_id 1 will be executed 349 as: 350 amixer -c 1 set PCM 2dB+ 351 352 Command output will be returned if any. 353 ''' 354 355 cmd = [AMIXER_PATH, '-c', str(card_id)] + cmd 356 p = cmd_utils.popen(cmd, stdout=subprocess.PIPE) 357 output, _ = p.communicate() 358 if p.wait() != 0: 359 raise RuntimeError('amixer command failed') 360 return output 361 362 363def get_num_seq_clients(): 364 '''Returns the number of seq clients. 365 366 The number of clients is parsed from aconnect -io. 367 This is run as the chronos user to catch permissions problems. 368 Sample content: 369 370 client 0: 'System' [type=kernel] 371 0 'Timer ' 372 1 'Announce ' 373 client 14: 'Midi Through' [type=kernel] 374 0 'Midi Through Port-0' 375 376 @raise RuntimeError: If no seq device is available. 377 ''' 378 cmd = [ACONNECT_PATH, '-io'] 379 output = cmd_utils.execute(cmd, stdout=subprocess.PIPE, run_as='chronos') 380 num_clients = 0 381 for line in output.splitlines(): 382 match = CLIENT_NUM_RE.match(line) 383 if match: 384 num_clients += 1 385 return num_clients 386 387def convert_device_name(cras_device_name): 388 '''Converts cras device name to alsa device name. 389 390 @returns: alsa device name that can be passed to aplay -D or arecord -D. 391 For example, if cras_device_name is "kbl_r5514_5663_max: :0,1", 392 this function will return "hw:0,1". 393 ''' 394 tokens = cras_device_name.split(":") 395 return "hw:%s" % tokens[2] 396 397def check_audio_stream_at_selected_device(device_name, device_type): 398 """Checks the audio output at expected node 399 400 @param device_name: Audio output device name, Ex: kbl_r5514_5663_max: :0,1 401 @param device_type: Audio output device type, Ex: INTERNAL_SPEAKER 402 """ 403 if device_type == 'BLUETOOTH': 404 output_device_output = utils.system_output(OUTPUT_DEVICE_CMD).strip() 405 bt_device = output_device_output.split('Output dev:')[1].strip() 406 if bt_device != device_name: 407 raise error.TestFail("Audio is not routing through expected node") 408 logging.info('Audio is routing through %s', bt_device) 409 else: 410 card_device_search = re.search(r':(\d),(\d)', device_name) 411 if card_device_search: 412 card_num = card_device_search.group(1) 413 device_num = card_device_search.group(2) 414 logging.debug("Sound card number is %s", card_num) 415 logging.debug("Device number is %s", device_num) 416 if card_num is None or device_num is None: 417 raise error.TestError("Audio device name is not in expected format") 418 device_status_output = utils.system_output(AUDIO_DEVICE_STATUS_CMD % 419 (card_num, device_num)) 420 logging.debug("Selected output device status is %s", 421 device_status_output) 422 423 if 'RUNNING' in device_status_output: 424 logging.info("Audio is routing through expected node!") 425 elif 'closed' in device_status_output: 426 raise error.TestFail("Audio is not routing through expected audio " 427 "node!") 428 else: 429 raise error.TestError("Audio routing error! Device may be " 430 "preparing")