1# Copyright (c) 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides cras audio utilities.""" 6 7import logging 8import re 9 10from autotest_lib.client.cros.audio import cmd_utils 11 12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client' 13 14 15class CrasUtilsError(Exception): 16 pass 17 18 19def playback(*args, **kargs): 20 """A helper function to execute the playback_cmd. 21 22 @param args: args passed to playback_cmd. 23 @param kargs: kargs passed to playback_cmd. 24 25 """ 26 cmd_utils.execute(playback_cmd(*args, **kargs)) 27 28 29def capture(*args, **kargs): 30 """A helper function to execute the capture_cmd. 31 32 @param args: args passed to capture_cmd. 33 @param kargs: kargs passed to capture_cmd. 34 35 """ 36 cmd_utils.execute(capture_cmd(*args, **kargs)) 37 38 39def playback_cmd(playback_file, block_size=None, duration=None, 40 channels=2, rate=48000): 41 """Gets a command to playback a file with given settings. 42 43 @param playback_file: the name of the file to play. '-' indicates to 44 playback raw audio from the stdin. 45 @param block_size: the number of frames per callback(dictates latency). 46 @param duration: seconds to playback 47 @param channels: number of channels. 48 @param rate: the sampling rate 49 50 @returns: The command args put in a list of strings. 51 52 """ 53 args = [_CRAS_TEST_CLIENT] 54 args += ['--playback_file', playback_file] 55 if block_size is not None: 56 args += ['--block_size', str(block_size)] 57 if duration is not None: 58 args += ['--duration', str(duration)] 59 args += ['--num_channels', str(channels)] 60 args += ['--rate', str(rate)] 61 return args 62 63 64def capture_cmd( 65 capture_file, block_size=None, duration=10, channels=1, rate=48000): 66 """Gets a command to capture the audio into the file with given settings. 67 68 @param capture_file: the name of file the audio to be stored in. 69 @param block_size: the number of frames per callback(dictates latency). 70 @param duration: seconds to record. If it is None, duration is not set, 71 and command will keep capturing audio until it is 72 terminated. 73 @param channels: number of channels. 74 @param rate: the sampling rate. 75 76 @returns: The command args put in a list of strings. 77 78 """ 79 args = [_CRAS_TEST_CLIENT] 80 args += ['--capture_file', capture_file] 81 if block_size is not None: 82 args += ['--block_size', str(block_size)] 83 if duration is not None: 84 args += ['--duration', str(duration)] 85 args += ['--num_channels', str(channels)] 86 args += ['--rate', str(rate)] 87 return args 88 89 90def loopback(*args, **kargs): 91 """A helper function to execute loopback_cmd. 92 93 @param args: args passed to loopback_cmd. 94 @param kargs: kargs passed to loopback_cmd. 95 96 """ 97 98 cmd_utils.execute(loopback_cmd(*args, **kargs)) 99 100 101def loopback_cmd(output_file, duration=10, channels=2, rate=48000): 102 """Gets a command to record the loopback. 103 104 @param output_file: The name of the file the loopback to be stored in. 105 @param channels: The number of channels of the recorded audio. 106 @param duration: seconds to record. 107 @param rate: the sampling rate. 108 109 @returns: The command args put in a list of strings. 110 111 """ 112 args = [_CRAS_TEST_CLIENT] 113 args += ['--loopback_file', output_file] 114 args += ['--duration_seconds', str(duration)] 115 args += ['--num_channels', str(channels)] 116 args += ['--rate', str(rate)] 117 return args 118 119 120def get_cras_nodes_cmd(): 121 """Gets a command to query the nodes from Cras. 122 123 @returns: The command to query nodes information from Cras using dbus-send. 124 125 """ 126 return ('dbus-send --system --type=method_call --print-reply ' 127 '--dest=org.chromium.cras /org/chromium/cras ' 128 'org.chromium.cras.Control.GetNodes') 129 130 131def set_system_volume(volume): 132 """Set the system volume. 133 134 @param volume: the system output vlume to be set(0 - 100). 135 136 """ 137 get_cras_control_interface().SetOutputVolume(volume) 138 139 140def set_node_volume(node_id, volume): 141 """Set the volume of the given output node. 142 143 @param node_id: the id of the output node to be set the volume. 144 @param volume: the volume to be set(0-100). 145 146 """ 147 get_cras_control_interface().SetOutputNodeVolume(node_id, volume) 148 149 150def set_capture_gain(gain): 151 """Set the system capture gain. 152 153 @param gain the capture gain in db*100 (100 = 1dB) 154 155 """ 156 get_cras_control_interface().SetInputGain(gain) 157 158 159def get_cras_control_interface(private=False): 160 """Gets Cras DBus control interface. 161 162 @param private: Set to True to use a new instance for dbus.SystemBus 163 instead of the shared instance. 164 165 @returns: A dBus.Interface object with Cras Control interface. 166 167 @raises: ImportError if this is not called on Cros device. 168 169 """ 170 try: 171 import dbus 172 except ImportError, e: 173 logging.exception( 174 'Can not import dbus: %s. This method should only be ' 175 'called on Cros device.', e) 176 raise 177 bus = dbus.SystemBus(private=private) 178 cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras') 179 return dbus.Interface(cras_object, 'org.chromium.cras.Control') 180 181 182def get_cras_nodes(): 183 """Gets nodes information from Cras. 184 185 @returns: A dict containing information of each node. 186 187 """ 188 return get_cras_control_interface().GetNodes() 189 190 191def get_selected_nodes(): 192 """Gets selected output nodes and input nodes. 193 194 @returns: A tuple (output_nodes, input_nodes) where each 195 field is a list of selected node IDs returned from Cras DBus API. 196 Note that there may be multiple output/input nodes being selected 197 at the same time. 198 199 """ 200 output_nodes = [] 201 input_nodes = [] 202 nodes = get_cras_nodes() 203 for node in nodes: 204 if node['Active']: 205 if node['IsInput']: 206 input_nodes.append(node['Id']) 207 else: 208 output_nodes.append(node['Id']) 209 return (output_nodes, input_nodes) 210 211 212def set_selected_output_node_volume(volume): 213 """Sets the selected output node volume. 214 215 @param volume: the volume to be set (0-100). 216 217 """ 218 selected_output_node_ids, _ = get_selected_nodes() 219 for node_id in selected_output_node_ids: 220 set_node_volume(node_id, volume) 221 222 223def get_active_stream_count(): 224 """Gets the number of active streams. 225 226 @returns: The number of active streams. 227 228 """ 229 return int(get_cras_control_interface().GetNumberOfActiveStreams()) 230 231 232def set_system_mute(is_mute): 233 """Sets the system mute switch. 234 235 @param is_mute: Set True to mute the system playback. 236 237 """ 238 get_cras_control_interface().SetOutputMute(is_mute) 239 240 241def set_capture_mute(is_mute): 242 """Sets the capture mute switch. 243 244 @param is_mute: Set True to mute the capture. 245 246 """ 247 get_cras_control_interface().SetInputMute(is_mute) 248 249 250def node_type_is_plugged(node_type, nodes_info): 251 """Determine if there is any node of node_type plugged. 252 253 This method is used in has_loopback_dongle in cros_host, where 254 the call is executed on autotest server. Use get_cras_nodes instead if 255 the call can be executed on Cros device. 256 257 Since Cras only reports the plugged node in GetNodes, we can 258 parse the return value to see if there is any node with the given type. 259 For example, if INTERNAL_MIC is of intereset, the pattern we are 260 looking for is: 261 262 dict entry( 263 string "Type" 264 variant string "INTERNAL_MIC" 265 ) 266 267 @param node_type: A str representing node type defined in CRAS_NODE_TYPES. 268 @param nodes_info: A str containing output of command get_nodes_cmd. 269 270 @returns: True if there is any node of node_type plugged. False otherwise. 271 272 """ 273 match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type, 274 nodes_info) 275 return True if match else False 276 277 278# Cras node types reported from Cras DBus control API. 279CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB', 280 'BLUETOOTH', 'UNKNOWN'] 281CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH', 282 'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN', 283 'KEYBOARD_MIC', 'AOKR'] 284CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES 285 286 287def get_filtered_node_types(callback): 288 """Returns the pair of filtered output node types and input node types. 289 290 @param callback: A callback function which takes a node as input parameter 291 and filter the node based on its return value. 292 293 @returns: A tuple (output_node_types, input_node_types) where each 294 field is a list of node types defined in CRAS_NODE_TYPES, 295 and their 'attribute_name' is True. 296 297 """ 298 output_node_types = [] 299 input_node_types = [] 300 nodes = get_cras_nodes() 301 for node in nodes: 302 if callback(node): 303 node_type = str(node['Type']) 304 if node_type not in CRAS_NODE_TYPES: 305 raise RuntimeError( 306 'node type %s is not valid' % node_type) 307 if node['IsInput']: 308 input_node_types.append(node_type) 309 else: 310 output_node_types.append(node_type) 311 return (output_node_types, input_node_types) 312 313 314def get_selected_node_types(): 315 """Returns the pair of active output node types and input node types. 316 317 @returns: A tuple (output_node_types, input_node_types) where each 318 field is a list of selected node types defined in CRAS_NODE_TYPES. 319 320 """ 321 def is_selected(node): 322 """Checks if a node is selected. 323 324 A node is selected if its Active attribute is True. 325 326 @returns: True is a node is selected, False otherwise. 327 328 """ 329 return node['Active'] 330 331 return get_filtered_node_types(is_selected) 332 333 334def get_plugged_node_types(): 335 """Returns the pair of plugged output node types and input node types. 336 337 @returns: A tuple (output_node_types, input_node_types) where each 338 field is a list of plugged node types defined in CRAS_NODE_TYPES. 339 340 """ 341 def is_plugged(node): 342 """Checks if a node is plugged and is not unknown node. 343 344 Cras DBus API only reports plugged node, so every node reported by Cras 345 DBus API is plugged. However, we filter out UNKNOWN node here because 346 the existence of unknown node depends on the number of redundant 347 playback/record audio device created on audio card. Also, the user of 348 Cras will ignore unknown nodes. 349 350 @returns: True if a node is plugged and is not an UNKNOWN node. 351 352 """ 353 return node['Type'] != 'UNKNOWN' 354 355 return get_filtered_node_types(is_plugged) 356 357 358def set_selected_node_types(output_node_types, input_node_types): 359 """Sets selected node types. 360 361 @param output_node_types: A list of output node types. None to skip setting. 362 @param input_node_types: A list of input node types. None to skip setting. 363 364 """ 365 if len(output_node_types) == 1: 366 set_single_selected_output_node(output_node_types[0]) 367 elif output_node_types: 368 set_selected_output_nodes(output_node_types) 369 if len(input_node_types) == 1: 370 set_single_selected_input_node(input_node_types[0]) 371 elif input_node_types: 372 set_selected_input_nodes(input_node_types) 373 374 375def set_single_selected_output_node(node_type): 376 """Sets one selected output node. 377 378 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 379 to select one output node. 380 381 @param node_type: A node type. 382 383 """ 384 nodes = get_cras_nodes() 385 for node in nodes: 386 if node['IsInput']: 387 continue 388 if node['Type'] == node_type: 389 set_active_output_node(node['Id']) 390 391 392def set_single_selected_input_node(node_type): 393 """Sets one selected input node. 394 395 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 396 to select one input node. 397 398 @param node_type: A node type. 399 400 """ 401 nodes = get_cras_nodes() 402 for node in nodes: 403 if not node['IsInput']: 404 continue 405 if node['Type'] == node_type: 406 set_active_input_node(node['Id']) 407 408 409def set_selected_output_nodes(types): 410 """Sets selected output node types. 411 412 Note that Chrome UI uses SetActiveOutputNode of Cras DBus API 413 to select one output node. Here we use add/remove active output node 414 to support multiple nodes. 415 416 @param types: A list of output node types. 417 418 """ 419 nodes = get_cras_nodes() 420 for node in nodes: 421 if node['IsInput']: 422 continue 423 if node['Type'] in types: 424 add_active_output_node(node['Id']) 425 elif node['Active']: 426 remove_active_output_node(node['Id']) 427 428 429def set_selected_input_nodes(types): 430 """Sets selected input node types. 431 432 Note that Chrome UI uses SetActiveInputNode of Cras DBus API 433 to select one input node. Here we use add/remove active input node 434 to support multiple nodes. 435 436 @param types: A list of input node types. 437 438 """ 439 nodes = get_cras_nodes() 440 for node in nodes: 441 if not node['IsInput']: 442 continue 443 if node['Type'] in types: 444 add_active_input_node(node['Id']) 445 elif node['Active']: 446 remove_active_input_node(node['Id']) 447 448 449def set_active_input_node(node_id): 450 """Sets one active input node. 451 452 @param node_id: node id. 453 454 """ 455 get_cras_control_interface().SetActiveInputNode(node_id) 456 457 458def set_active_output_node(node_id): 459 """Sets one active output node. 460 461 @param node_id: node id. 462 463 """ 464 get_cras_control_interface().SetActiveOutputNode(node_id) 465 466 467def add_active_output_node(node_id): 468 """Adds an active output node. 469 470 @param node_id: node id. 471 472 """ 473 get_cras_control_interface().AddActiveOutputNode(node_id) 474 475 476def add_active_input_node(node_id): 477 """Adds an active input node. 478 479 @param node_id: node id. 480 481 """ 482 get_cras_control_interface().AddActiveInputNode(node_id) 483 484 485def remove_active_output_node(node_id): 486 """Removes an active output node. 487 488 @param node_id: node id. 489 490 """ 491 get_cras_control_interface().RemoveActiveOutputNode(node_id) 492 493 494def remove_active_input_node(node_id): 495 """Removes an active input node. 496 497 @param node_id: node id. 498 499 """ 500 get_cras_control_interface().RemoveActiveInputNode(node_id) 501 502 503def get_node_id_from_node_type(node_type, is_input): 504 """Gets node id from node type. 505 506 @param types: A node type defined in CRAS_NODE_TYPES. 507 @param is_input: True if the node is input. False otherwise. 508 509 @returns: A string for node id. 510 511 @raises: CrasUtilsError: if unique node id can not be found. 512 513 """ 514 nodes = get_cras_nodes() 515 find_ids = [] 516 for node in nodes: 517 if node['Type'] == node_type and node['IsInput'] == is_input: 518 find_ids.append(node['Id']) 519 if len(find_ids) != 1: 520 raise CrasUtilsError( 521 'Can not find unique node id from node type %s' % node_type) 522 return find_ids[0] 523