1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides cras audio utilities."""
6
7import logging
8import re
9
10from autotest_lib.client.cros.audio import cmd_utils
11
12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
13
14
15class CrasUtilsError(Exception):
16    pass
17
18
19def playback(blocking=True, *args, **kargs):
20    """A helper function to execute the playback_cmd.
21
22    @param blocking: Blocks this call until playback finishes.
23    @param args: args passed to playback_cmd.
24    @param kargs: kargs passed to playback_cmd.
25
26    @returns: The process running the playback command. Note that if the
27              blocking parameter is true, this will return a finished process.
28    """
29    process = cmd_utils.popen(playback_cmd(*args, **kargs))
30    if blocking:
31        cmd_utils.wait_and_check_returncode(process)
32    return process
33
34
35def capture(*args, **kargs):
36    """A helper function to execute the capture_cmd.
37
38    @param args: args passed to capture_cmd.
39    @param kargs: kargs passed to capture_cmd.
40
41    """
42    cmd_utils.execute(capture_cmd(*args, **kargs))
43
44
45def playback_cmd(playback_file, block_size=None, duration=None,
46                 channels=2, rate=48000):
47    """Gets a command to playback a file with given settings.
48
49    @param playback_file: the name of the file to play. '-' indicates to
50                          playback raw audio from the stdin.
51    @param block_size: the number of frames per callback(dictates latency).
52    @param duration: seconds to playback
53    @param channels: number of channels.
54    @param rate: the sampling rate
55
56    @returns: The command args put in a list of strings.
57
58    """
59    args = [_CRAS_TEST_CLIENT]
60    args += ['--playback_file', playback_file]
61    if block_size is not None:
62        args += ['--block_size', str(block_size)]
63    if duration is not None:
64        args += ['--duration', str(duration)]
65    args += ['--num_channels', str(channels)]
66    args += ['--rate', str(rate)]
67    return args
68
69
70def capture_cmd(
71        capture_file, block_size=None, duration=10, channels=1, rate=48000):
72    """Gets a command to capture the audio into the file with given settings.
73
74    @param capture_file: the name of file the audio to be stored in.
75    @param block_size: the number of frames per callback(dictates latency).
76    @param duration: seconds to record. If it is None, duration is not set,
77                     and command will keep capturing audio until it is
78                     terminated.
79    @param channels: number of channels.
80    @param rate: the sampling rate.
81
82    @returns: The command args put in a list of strings.
83
84    """
85    args = [_CRAS_TEST_CLIENT]
86    args += ['--capture_file', capture_file]
87    if block_size is not None:
88        args += ['--block_size', str(block_size)]
89    if duration is not None:
90        args += ['--duration', str(duration)]
91    args += ['--num_channels', str(channels)]
92    args += ['--rate', str(rate)]
93    return args
94
95
96def loopback(*args, **kargs):
97    """A helper function to execute loopback_cmd.
98
99    @param args: args passed to loopback_cmd.
100    @param kargs: kargs passed to loopback_cmd.
101
102    """
103
104    cmd_utils.execute(loopback_cmd(*args, **kargs))
105
106
107def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
108    """Gets a command to record the loopback.
109
110    @param output_file: The name of the file the loopback to be stored in.
111    @param channels: The number of channels of the recorded audio.
112    @param duration: seconds to record.
113    @param rate: the sampling rate.
114
115    @returns: The command args put in a list of strings.
116
117    """
118    args = [_CRAS_TEST_CLIENT]
119    args += ['--loopback_file', output_file]
120    args += ['--duration_seconds', str(duration)]
121    args += ['--num_channels', str(channels)]
122    args += ['--rate', str(rate)]
123    return args
124
125
126def get_cras_nodes_cmd():
127    """Gets a command to query the nodes from Cras.
128
129    @returns: The command to query nodes information from Cras using dbus-send.
130
131    """
132    return ('dbus-send --system --type=method_call --print-reply '
133            '--dest=org.chromium.cras /org/chromium/cras '
134            'org.chromium.cras.Control.GetNodes')
135
136
137def set_system_volume(volume):
138    """Set the system volume.
139
140    @param volume: the system output vlume to be set(0 - 100).
141
142    """
143    get_cras_control_interface().SetOutputVolume(volume)
144
145
146def set_node_volume(node_id, volume):
147    """Set the volume of the given output node.
148
149    @param node_id: the id of the output node to be set the volume.
150    @param volume: the volume to be set(0-100).
151
152    """
153    get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
154
155
156def set_capture_gain(gain):
157    """Set the system capture gain.
158
159    @param gain the capture gain in db*100 (100 = 1dB)
160
161    """
162    get_cras_control_interface().SetInputGain(gain)
163
164
165def get_cras_control_interface(private=False):
166    """Gets Cras DBus control interface.
167
168    @param private: Set to True to use a new instance for dbus.SystemBus
169                    instead of the shared instance.
170
171    @returns: A dBus.Interface object with Cras Control interface.
172
173    @raises: ImportError if this is not called on Cros device.
174
175    """
176    try:
177        import dbus
178    except ImportError, e:
179        logging.exception(
180                'Can not import dbus: %s. This method should only be '
181                'called on Cros device.', e)
182        raise
183    bus = dbus.SystemBus(private=private)
184    cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
185    return dbus.Interface(cras_object, 'org.chromium.cras.Control')
186
187
188def get_cras_nodes():
189    """Gets nodes information from Cras.
190
191    @returns: A dict containing information of each node.
192
193    """
194    return get_cras_control_interface().GetNodes()
195
196
197def get_selected_nodes():
198    """Gets selected output nodes and input nodes.
199
200    @returns: A tuple (output_nodes, input_nodes) where each
201              field is a list of selected node IDs returned from Cras DBus API.
202              Note that there may be multiple output/input nodes being selected
203              at the same time.
204
205    """
206    output_nodes = []
207    input_nodes = []
208    nodes = get_cras_nodes()
209    for node in nodes:
210        if node['Active']:
211            if node['IsInput']:
212                input_nodes.append(node['Id'])
213            else:
214                output_nodes.append(node['Id'])
215    return (output_nodes, input_nodes)
216
217
218def set_selected_output_node_volume(volume):
219    """Sets the selected output node volume.
220
221    @param volume: the volume to be set (0-100).
222
223    """
224    selected_output_node_ids, _ = get_selected_nodes()
225    for node_id in selected_output_node_ids:
226        set_node_volume(node_id, volume)
227
228
229def get_active_stream_count():
230    """Gets the number of active streams.
231
232    @returns: The number of active streams.
233
234    """
235    return int(get_cras_control_interface().GetNumberOfActiveStreams())
236
237
238def set_system_mute(is_mute):
239    """Sets the system mute switch.
240
241    @param is_mute: Set True to mute the system playback.
242
243    """
244    get_cras_control_interface().SetOutputMute(is_mute)
245
246
247def set_capture_mute(is_mute):
248    """Sets the capture mute switch.
249
250    @param is_mute: Set True to mute the capture.
251
252    """
253    get_cras_control_interface().SetInputMute(is_mute)
254
255
256def node_type_is_plugged(node_type, nodes_info):
257    """Determine if there is any node of node_type plugged.
258
259    This method is used in has_loopback_dongle in cros_host, where
260    the call is executed on autotest server. Use get_cras_nodes instead if
261    the call can be executed on Cros device.
262
263    Since Cras only reports the plugged node in GetNodes, we can
264    parse the return value to see if there is any node with the given type.
265    For example, if INTERNAL_MIC is of intereset, the pattern we are
266    looking for is:
267
268    dict entry(
269       string "Type"
270       variant             string "INTERNAL_MIC"
271    )
272
273    @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
274    @param nodes_info: A str containing output of command get_nodes_cmd.
275
276    @returns: True if there is any node of node_type plugged. False otherwise.
277
278    """
279    match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
280                      nodes_info)
281    return True if match else False
282
283
284# Cras node types reported from Cras DBus control API.
285CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
286                          'BLUETOOTH', 'UNKNOWN']
287CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
288                         'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
289                         'KEYBOARD_MIC', 'HOTWORD']
290CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
291
292
293def get_filtered_node_types(callback):
294    """Returns the pair of filtered output node types and input node types.
295
296    @param callback: A callback function which takes a node as input parameter
297                     and filter the node based on its return value.
298
299    @returns: A tuple (output_node_types, input_node_types) where each
300              field is a list of node types defined in CRAS_NODE_TYPES,
301              and their 'attribute_name' is True.
302
303    """
304    output_node_types = []
305    input_node_types = []
306    nodes = get_cras_nodes()
307    for node in nodes:
308        if callback(node):
309            node_type = str(node['Type'])
310            if node_type not in CRAS_NODE_TYPES:
311                raise RuntimeError(
312                        'node type %s is not valid' % node_type)
313            if node['IsInput']:
314                input_node_types.append(node_type)
315            else:
316                output_node_types.append(node_type)
317    return (output_node_types, input_node_types)
318
319
320def get_selected_node_types():
321    """Returns the pair of active output node types and input node types.
322
323    @returns: A tuple (output_node_types, input_node_types) where each
324              field is a list of selected node types defined in CRAS_NODE_TYPES.
325
326    """
327    def is_selected(node):
328        """Checks if a node is selected.
329
330        A node is selected if its Active attribute is True.
331
332        @returns: True is a node is selected, False otherwise.
333
334        """
335        return node['Active']
336
337    return get_filtered_node_types(is_selected)
338
339
340def get_plugged_node_types():
341    """Returns the pair of plugged output node types and input node types.
342
343    @returns: A tuple (output_node_types, input_node_types) where each
344              field is a list of plugged node types defined in CRAS_NODE_TYPES.
345
346    """
347    def is_plugged(node):
348        """Checks if a node is plugged and is not unknown node.
349
350        Cras DBus API only reports plugged node, so every node reported by Cras
351        DBus API is plugged. However, we filter out UNKNOWN node here because
352        the existence of unknown node depends on the number of redundant
353        playback/record audio device created on audio card. Also, the user of
354        Cras will ignore unknown nodes.
355
356        @returns: True if a node is plugged and is not an UNKNOWN node.
357
358        """
359        return node['Type'] != 'UNKNOWN'
360
361    return get_filtered_node_types(is_plugged)
362
363
364def set_selected_node_types(output_node_types, input_node_types):
365    """Sets selected node types.
366
367    @param output_node_types: A list of output node types. None to skip setting.
368    @param input_node_types: A list of input node types. None to skip setting.
369
370    """
371    if len(output_node_types) == 1:
372        set_single_selected_output_node(output_node_types[0])
373    elif output_node_types:
374        set_selected_output_nodes(output_node_types)
375    if len(input_node_types) == 1:
376        set_single_selected_input_node(input_node_types[0])
377    elif input_node_types:
378        set_selected_input_nodes(input_node_types)
379
380
381def set_single_selected_output_node(node_type):
382    """Sets one selected output node.
383
384    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
385    to select one output node.
386
387    @param node_type: A node type.
388
389    """
390    nodes = get_cras_nodes()
391    for node in nodes:
392        if node['IsInput']:
393            continue
394        if node['Type'] == node_type:
395            set_active_output_node(node['Id'])
396
397
398def set_single_selected_input_node(node_type):
399    """Sets one selected input node.
400
401    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
402    to select one input node.
403
404    @param node_type: A node type.
405
406    """
407    nodes = get_cras_nodes()
408    for node in nodes:
409        if not node['IsInput']:
410            continue
411        if node['Type'] == node_type:
412            set_active_input_node(node['Id'])
413
414
415def set_selected_output_nodes(types):
416    """Sets selected output node types.
417
418    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
419    to select one output node. Here we use add/remove active output node
420    to support multiple nodes.
421
422    @param types: A list of output node types.
423
424    """
425    nodes = get_cras_nodes()
426    for node in nodes:
427        if node['IsInput']:
428            continue
429        if node['Type'] in types:
430            add_active_output_node(node['Id'])
431        elif node['Active']:
432            remove_active_output_node(node['Id'])
433
434
435def set_selected_input_nodes(types):
436    """Sets selected input node types.
437
438    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
439    to select one input node. Here we use add/remove active input node
440    to support multiple nodes.
441
442    @param types: A list of input node types.
443
444    """
445    nodes = get_cras_nodes()
446    for node in nodes:
447        if not node['IsInput']:
448            continue
449        if node['Type'] in types:
450            add_active_input_node(node['Id'])
451        elif node['Active']:
452            remove_active_input_node(node['Id'])
453
454
455def set_active_input_node(node_id):
456    """Sets one active input node.
457
458    @param node_id: node id.
459
460    """
461    get_cras_control_interface().SetActiveInputNode(node_id)
462
463
464def set_active_output_node(node_id):
465    """Sets one active output node.
466
467    @param node_id: node id.
468
469    """
470    get_cras_control_interface().SetActiveOutputNode(node_id)
471
472
473def add_active_output_node(node_id):
474    """Adds an active output node.
475
476    @param node_id: node id.
477
478    """
479    get_cras_control_interface().AddActiveOutputNode(node_id)
480
481
482def add_active_input_node(node_id):
483    """Adds an active input node.
484
485    @param node_id: node id.
486
487    """
488    get_cras_control_interface().AddActiveInputNode(node_id)
489
490
491def remove_active_output_node(node_id):
492    """Removes an active output node.
493
494    @param node_id: node id.
495
496    """
497    get_cras_control_interface().RemoveActiveOutputNode(node_id)
498
499
500def remove_active_input_node(node_id):
501    """Removes an active input node.
502
503    @param node_id: node id.
504
505    """
506    get_cras_control_interface().RemoveActiveInputNode(node_id)
507
508
509def get_node_id_from_node_type(node_type, is_input):
510    """Gets node id from node type.
511
512    @param types: A node type defined in CRAS_NODE_TYPES.
513    @param is_input: True if the node is input. False otherwise.
514
515    @returns: A string for node id.
516
517    @raises: CrasUtilsError: if unique node id can not be found.
518
519    """
520    nodes = get_cras_nodes()
521    find_ids = []
522    for node in nodes:
523        if node['Type'] == node_type and node['IsInput'] == is_input:
524            find_ids.append(node['Id'])
525    if len(find_ids) != 1:
526        raise CrasUtilsError(
527                'Can not find unique node id from node type %s' % node_type)
528    return find_ids[0]
529
530def get_active_node_volume():
531    """Returns volume from active node.
532
533    @returns: int for volume
534
535    @raises: CrasUtilsError: if node volume cannot be found.
536    """
537    nodes = get_cras_nodes()
538    for node in nodes:
539        if node['Active'] == 1 and node['IsInput'] == 0:
540            return int(node['NodeVolume'])
541    raise CrasUtilsError('Cannot find active node volume from nodes.')
542