1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides cras audio utilities.""" 6 7import logging 8import re 9 10from autotest_lib.client.cros.audio import cmd_utils 11 12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' 13 14 15class CrasUtilsError(Exception): 16 pass 17 18 19def playback(blocking=True, *args, **kargs): 20 """A helper function to execute the playback_cmd. 21 22 @param blocking: Blocks this call until playback finishes. 23 @param args: args passed to playback_cmd. 24 @param kargs: kargs passed to playback_cmd. 25 26 @returns: The process running the playback command. Note that if the 27 blocking parameter is true, this will return a finished process. 28 """ 29 process = cmd_utils.popen(playback_cmd(*args, **kargs)) 30 if blocking: 31 cmd_utils.wait_and_check_returncode(process) 32 return process 33 34 35def capture(*args, **kargs): 36 """A helper function to execute the capture_cmd. 37 38 @param args: args passed to capture_cmd. 39 @param kargs: kargs passed to capture_cmd. 40 41 """ 42 cmd_utils.execute(capture_cmd(*args, **kargs)) 43 44 45def playback_cmd(playback_file, block_size=None, duration=None, 46 channels=2, rate=48000): 47 """Gets a command to playback a file with given settings. 48 49 @param playback_file: the name of the file to play. '-' indicates to 50 playback raw audio from the stdin. 51 @param block_size: the number of frames per callback(dictates latency). 52 @param duration: seconds to playback 53 @param channels: number of channels. 54 @param rate: the sampling rate 55 56 @returns: The command args put in a list of strings. 57 58 """ 59 args = [_CRAS_TEST_CLIENT] 60 args += ['--playback_file', playback_file] 61 if block_size is not None: 62 args += ['--block_size', str(block_size)] 63 if duration is not None: 64 args += ['--duration', str(duration)] 65 args += ['--num_channels', str(channels)] 66 args += ['--rate', str(rate)] 67 return args 68 69 70def capture_cmd( 71 capture_file, block_size=None, duration=10, channels=1, rate=48000): 72 """Gets a command to capture the audio into the file with given settings. 73 74 @param capture_file: the name of file the audio to be stored in. 75 @param block_size: the number of frames per callback(dictates latency). 76 @param duration: seconds to record. If it is None, duration is not set, 77 and command will keep capturing audio until it is 78 terminated. 79 @param channels: number of channels. 80 @param rate: the sampling rate. 81 82 @returns: The command args put in a list of strings. 83 84 """ 85 args = [_CRAS_TEST_CLIENT] 86 args += ['--capture_file', capture_file] 87 if block_size is not None: 88 args += ['--block_size', str(block_size)] 89 if duration is not None: 90 args += ['--duration', str(duration)] 91 args += ['--num_channels', str(channels)] 92 args += ['--rate', str(rate)] 93 return args 94 95 96def loopback(*args, **kargs): 97 """A helper function to execute loopback_cmd. 98 99 @param args: args passed to loopback_cmd. 100 @param kargs: kargs passed to loopback_cmd. 101 102 """ 103 104 cmd_utils.execute(loopback_cmd(*args, **kargs)) 105 106 107def loopback_cmd(output_file, duration=10, channels=2, rate=48000): 108 """Gets a command to record the loopback. 109 110 @param output_file: The name of the file the loopback to be stored in. 111 @param channels: The number of channels of the recorded audio. 112 @param duration: seconds to record. 113 @param rate: the sampling rate. 114 115 @returns: The command args put in a list of strings. 116 117 """ 118 args = [_CRAS_TEST_CLIENT] 119 args += ['--loopback_file', output_file] 120 args += ['--duration_seconds', str(duration)] 121 args += ['--num_channels', str(channels)] 122 args += ['--rate', str(rate)] 123 return args 124 125 126def get_cras_nodes_cmd(): 127 """Gets a command to query the nodes from Cras. 128 129 @returns: The command to query nodes information from Cras using dbus-send. 130 131 """ 132 return ('dbus-send --system --type=method_call --print-reply ' 133 '--dest=org.chromium.cras /org/chromium/cras ' 134 'org.chromium.cras.Control.GetNodes') 135 136 137def set_system_volume(volume): 138 """Set the system volume. 139 140 @param volume: the system output vlume to be set(0 - 100). 141 142 """ 143 get_cras_control_interface().SetOutputVolume(volume) 144 145 146def set_node_volume(node_id, volume): 147 """Set the volume of the given output node. 148 149 @param node_id: the id of the output node to be set the volume. 150 @param volume: the volume to be set(0-100). 151 152 """ 153 get_cras_control_interface().SetOutputNodeVolume(node_id, volume) 154 155 156def set_capture_gain(gain): 157 """Set the system capture gain. 158 159 @param gain the capture gain in db*100 (100 = 1dB) 160 161 """ 162 get_cras_control_interface().SetInputGain(gain) 163 164 165def get_cras_control_interface(private=False): 166 """Gets Cras DBus control interface. 167 168 @param private: Set to True to use a new instance for dbus.SystemBus 169 instead of the shared instance. 170 171 @returns: A dBus.Interface object with Cras Control interface. 172 173 @raises: ImportError if this is not called on Cros device. 174 175 """ 176 try: 177 import dbus 178 except ImportError, e: 179 logging.exception( 180 'Can not import dbus: %s. This method should only be ' 181 'called on Cros device.', e) 182 raise 183 bus = dbus.SystemBus(private=private) 184 cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') 185 return dbus.Interface(cras_object, 'org.chromium.cras.Control') 186 187 188def get_cras_nodes(): 189 """Gets nodes information from Cras. 190 191 @returns: A dict containing information of each node. 192 193 """ 194 return get_cras_control_interface().GetNodes() 195 196 197def get_selected_nodes(): 198 """Gets selected output nodes and input nodes. 199 200 @returns: A tuple (output_nodes, input_nodes) where each 201 field is a list of selected node IDs returned from Cras DBus API. 202 Note that there may be multiple output/input nodes being selected 203 at the same time. 204 205 """ 206 output_nodes = [] 207 input_nodes = [] 208 nodes = get_cras_nodes() 209 for node in nodes: 210 if node['Active']: 211 if node['IsInput']: 212 input_nodes.append(node['Id']) 213 else: 214 output_nodes.append(node['Id']) 215 return (output_nodes, input_nodes) 216 217 218def set_selected_output_node_volume(volume): 219 """Sets the selected output node volume. 220 221 @param volume: the volume to be set (0-100). 222 223 """ 224 selected_output_node_ids, _ = get_selected_nodes() 225 for node_id in selected_output_node_ids: 226 set_node_volume(node_id, volume) 227 228 229def get_active_stream_count(): 230 """Gets the number of active streams. 231 232 @returns: The number of active streams. 233 234 """ 235 return int(get_cras_control_interface().GetNumberOfActiveStreams()) 236 237 238def set_system_mute(is_mute): 239 """Sets the system mute switch. 240 241 @param is_mute: Set True to mute the system playback. 242 243 """ 244 get_cras_control_interface().SetOutputMute(is_mute) 245 246 247def set_capture_mute(is_mute): 248 """Sets the capture mute switch. 249 250 @param is_mute: Set True to mute the capture. 251 252 """ 253 get_cras_control_interface().SetInputMute(is_mute) 254 255 256def node_type_is_plugged(node_type, nodes_info): 257 """Determine if there is any node of node_type plugged. 258 259 This method is used in has_loopback_dongle in cros_host, where 260 the call is executed on autotest server. Use get_cras_nodes instead if 261 the call can be executed on Cros device. 262 263 Since Cras only reports the plugged node in GetNodes, we can 264 parse the return value to see if there is any node with the given type. 265 For example, if INTERNAL_MIC is of intereset, the pattern we are 266 looking for is: 267 268 dict entry( 269 string "Type" 270 variant string "INTERNAL_MIC" 271 ) 272 273 @param node_type: A str representing node type defined in CRAS_NODE_TYPES. 274 @param nodes_info: A str containing output of command get_nodes_cmd. 275 276 @returns: True if there is any node of node_type plugged. False otherwise. 277 278 """ 279 match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, 280 nodes_info) 281 return True if match else False 282 283 284# Cras node types reported from Cras DBus control API. 285CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', 286 'BLUETOOTH', 'UNKNOWN'] 287CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', 288 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', 289 'KEYBOARD_MIC', 'HOTWORD'] 290CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES 291 292 293def get_filtered_node_types(callback): 294 """Returns the pair of filtered output node types and input node types. 295 296 @param callback: A callback function which takes a node as input parameter 297 and filter the node based on its return value. 298 299 @returns: A tuple (output_node_types, input_node_types) where each 300 field is a list of node types defined in CRAS_NODE_TYPES, 301 and their 'attribute_name' is True. 302 303 """ 304 output_node_types = [] 305 input_node_types = [] 306 nodes = get_cras_nodes() 307 for node in nodes: 308 if callback(node): 309 node_type = str(node['Type']) 310 if node_type not in CRAS_NODE_TYPES: 311 raise RuntimeError( 312 'node type %s is not valid' % node_type) 313 if node['IsInput']: 314 input_node_types.append(node_type) 315 else: 316 output_node_types.append(node_type) 317 return (output_node_types, input_node_types) 318 319 320def get_selected_node_types(): 321 """Returns the pair of active output node types and input node types. 322 323 @returns: A tuple (output_node_types, input_node_types) where each 324 field is a list of selected node types defined in CRAS_NODE_TYPES. 325 326 """ 327 def is_selected(node): 328 """Checks if a node is selected. 329 330 A node is selected if its Active attribute is True. 331 332 @returns: True is a node is selected, False otherwise. 333 334 """ 335 return node['Active'] 336 337 return get_filtered_node_types(is_selected) 338 339 340def get_plugged_node_types(): 341 """Returns the pair of plugged output node types and input node types. 342 343 @returns: A tuple (output_node_types, input_node_types) where each 344 field is a list of plugged node types defined in CRAS_NODE_TYPES. 345 346 """ 347 def is_plugged(node): 348 """Checks if a node is plugged and is not unknown node. 349 350 Cras DBus API only reports plugged node, so every node reported by Cras 351 DBus API is plugged. However, we filter out UNKNOWN node here because 352 the existence of unknown node depends on the number of redundant 353 playback/record audio device created on audio card. Also, the user of 354 Cras will ignore unknown nodes. 355 356 @returns: True if a node is plugged and is not an UNKNOWN node. 357 358 """ 359 return node['Type'] != 'UNKNOWN' 360 361 return get_filtered_node_types(is_plugged) 362 363 364def set_selected_node_types(output_node_types, input_node_types): 365 """Sets selected node types. 366 367 @param output_node_types: A list of output node types. None to skip setting. 368 @param input_node_types: A list of input node types. None to skip setting. 369 370 """ 371 if len(output_node_types) == 1: 372 set_single_selected_output_node(output_node_types[0]) 373 elif output_node_types: 374 set_selected_output_nodes(output_node_types) 375 if len(input_node_types) == 1: 376 set_single_selected_input_node(input_node_types[0]) 377 elif input_node_types: 378 set_selected_input_nodes(input_node_types) 379 380 381def set_single_selected_output_node(node_type): 382 """Sets one selected output node. 383 384 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 385 to select one output node. 386 387 @param node_type: A node type. 388 389 """ 390 nodes = get_cras_nodes() 391 for node in nodes: 392 if node['IsInput']: 393 continue 394 if node['Type'] == node_type: 395 set_active_output_node(node['Id']) 396 397 398def set_single_selected_input_node(node_type): 399 """Sets one selected input node. 400 401 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 402 to select one input node. 403 404 @param node_type: A node type. 405 406 """ 407 nodes = get_cras_nodes() 408 for node in nodes: 409 if not node['IsInput']: 410 continue 411 if node['Type'] == node_type: 412 set_active_input_node(node['Id']) 413 414 415def set_selected_output_nodes(types): 416 """Sets selected output node types. 417 418 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 419 to select one output node. Here we use add/remove active output node 420 to support multiple nodes. 421 422 @param types: A list of output node types. 423 424 """ 425 nodes = get_cras_nodes() 426 for node in nodes: 427 if node['IsInput']: 428 continue 429 if node['Type'] in types: 430 add_active_output_node(node['Id']) 431 elif node['Active']: 432 remove_active_output_node(node['Id']) 433 434 435def set_selected_input_nodes(types): 436 """Sets selected input node types. 437 438 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 439 to select one input node. Here we use add/remove active input node 440 to support multiple nodes. 441 442 @param types: A list of input node types. 443 444 """ 445 nodes = get_cras_nodes() 446 for node in nodes: 447 if not node['IsInput']: 448 continue 449 if node['Type'] in types: 450 add_active_input_node(node['Id']) 451 elif node['Active']: 452 remove_active_input_node(node['Id']) 453 454 455def set_active_input_node(node_id): 456 """Sets one active input node. 457 458 @param node_id: node id. 459 460 """ 461 get_cras_control_interface().SetActiveInputNode(node_id) 462 463 464def set_active_output_node(node_id): 465 """Sets one active output node. 466 467 @param node_id: node id. 468 469 """ 470 get_cras_control_interface().SetActiveOutputNode(node_id) 471 472 473def add_active_output_node(node_id): 474 """Adds an active output node. 475 476 @param node_id: node id. 477 478 """ 479 get_cras_control_interface().AddActiveOutputNode(node_id) 480 481 482def add_active_input_node(node_id): 483 """Adds an active input node. 484 485 @param node_id: node id. 486 487 """ 488 get_cras_control_interface().AddActiveInputNode(node_id) 489 490 491def remove_active_output_node(node_id): 492 """Removes an active output node. 493 494 @param node_id: node id. 495 496 """ 497 get_cras_control_interface().RemoveActiveOutputNode(node_id) 498 499 500def remove_active_input_node(node_id): 501 """Removes an active input node. 502 503 @param node_id: node id. 504 505 """ 506 get_cras_control_interface().RemoveActiveInputNode(node_id) 507 508 509def get_node_id_from_node_type(node_type, is_input): 510 """Gets node id from node type. 511 512 @param types: A node type defined in CRAS_NODE_TYPES. 513 @param is_input: True if the node is input. False otherwise. 514 515 @returns: A string for node id. 516 517 @raises: CrasUtilsError: if unique node id can not be found. 518 519 """ 520 nodes = get_cras_nodes() 521 find_ids = [] 522 for node in nodes: 523 if node['Type'] == node_type and node['IsInput'] == is_input: 524 find_ids.append(node['Id']) 525 if len(find_ids) != 1: 526 raise CrasUtilsError( 527 'Can not find unique node id from node type %s' % node_type) 528 return find_ids[0] 529 530def get_active_node_volume(): 531 """Returns volume from active node. 532 533 @returns: int for volume 534 535 @raises: CrasUtilsError: if node volume cannot be found. 536 """ 537 nodes = get_cras_nodes() 538 for node in nodes: 539 if node['Active'] == 1 and node['IsInput'] == 0: 540 return int(node['NodeVolume']) 541 raise CrasUtilsError('Cannot find active node volume from nodes.') 542