1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides cras audio utilities."""
6
7import logging
8import re
9
10from autotest_lib.client.cros.audio import cmd_utils
11
12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
13
14
15class CrasUtilsError(Exception):
16    pass
17
18
19def playback(*args, **kargs):
20    """A helper function to execute the playback_cmd.
21
22    @param args: args passed to playback_cmd.
23    @param kargs: kargs passed to playback_cmd.
24
25    """
26    cmd_utils.execute(playback_cmd(*args, **kargs))
27
28
29def capture(*args, **kargs):
30    """A helper function to execute the capture_cmd.
31
32    @param args: args passed to capture_cmd.
33    @param kargs: kargs passed to capture_cmd.
34
35    """
36    cmd_utils.execute(capture_cmd(*args, **kargs))
37
38
39def playback_cmd(playback_file, block_size=None, duration=None,
40                 channels=2, rate=48000):
41    """Gets a command to playback a file with given settings.
42
43    @param playback_file: the name of the file to play. '-' indicates to
44                          playback raw audio from the stdin.
45    @param block_size: the number of frames per callback(dictates latency).
46    @param duration: seconds to playback
47    @param channels: number of channels.
48    @param rate: the sampling rate
49
50    @returns: The command args put in a list of strings.
51
52    """
53    args = [_CRAS_TEST_CLIENT]
54    args += ['--playback_file', playback_file]
55    if block_size is not None:
56        args += ['--block_size', str(block_size)]
57    if duration is not None:
58        args += ['--duration', str(duration)]
59    args += ['--num_channels', str(channels)]
60    args += ['--rate', str(rate)]
61    return args
62
63
64def capture_cmd(
65        capture_file, block_size=None, duration=10, channels=1, rate=48000):
66    """Gets a command to capture the audio into the file with given settings.
67
68    @param capture_file: the name of file the audio to be stored in.
69    @param block_size: the number of frames per callback(dictates latency).
70    @param duration: seconds to record. If it is None, duration is not set,
71                     and command will keep capturing audio until it is
72                     terminated.
73    @param channels: number of channels.
74    @param rate: the sampling rate.
75
76    @returns: The command args put in a list of strings.
77
78    """
79    args = [_CRAS_TEST_CLIENT]
80    args += ['--capture_file', capture_file]
81    if block_size is not None:
82        args += ['--block_size', str(block_size)]
83    if duration is not None:
84        args += ['--duration', str(duration)]
85    args += ['--num_channels', str(channels)]
86    args += ['--rate', str(rate)]
87    return args
88
89
90def loopback(*args, **kargs):
91    """A helper function to execute loopback_cmd.
92
93    @param args: args passed to loopback_cmd.
94    @param kargs: kargs passed to loopback_cmd.
95
96    """
97
98    cmd_utils.execute(loopback_cmd(*args, **kargs))
99
100
101def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
102    """Gets a command to record the loopback.
103
104    @param output_file: The name of the file the loopback to be stored in.
105    @param channels: The number of channels of the recorded audio.
106    @param duration: seconds to record.
107    @param rate: the sampling rate.
108
109    @returns: The command args put in a list of strings.
110
111    """
112    args = [_CRAS_TEST_CLIENT]
113    args += ['--loopback_file', output_file]
114    args += ['--duration_seconds', str(duration)]
115    args += ['--num_channels', str(channels)]
116    args += ['--rate', str(rate)]
117    return args
118
119
120def get_cras_nodes_cmd():
121    """Gets a command to query the nodes from Cras.
122
123    @returns: The command to query nodes information from Cras using dbus-send.
124
125    """
126    return ('dbus-send --system --type=method_call --print-reply '
127            '--dest=org.chromium.cras /org/chromium/cras '
128            'org.chromium.cras.Control.GetNodes')
129
130
131def set_system_volume(volume):
132    """Set the system volume.
133
134    @param volume: the system output vlume to be set(0 - 100).
135
136    """
137    get_cras_control_interface().SetOutputVolume(volume)
138
139
140def set_node_volume(node_id, volume):
141    """Set the volume of the given output node.
142
143    @param node_id: the id of the output node to be set the volume.
144    @param volume: the volume to be set(0-100).
145
146    """
147    get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
148
149
150def set_capture_gain(gain):
151    """Set the system capture gain.
152
153    @param gain the capture gain in db*100 (100 = 1dB)
154
155    """
156    get_cras_control_interface().SetInputGain(gain)
157
158
159def get_cras_control_interface(private=False):
160    """Gets Cras DBus control interface.
161
162    @param private: Set to True to use a new instance for dbus.SystemBus
163                    instead of the shared instance.
164
165    @returns: A dBus.Interface object with Cras Control interface.
166
167    @raises: ImportError if this is not called on Cros device.
168
169    """
170    try:
171        import dbus
172    except ImportError, e:
173        logging.exception(
174                'Can not import dbus: %s. This method should only be '
175                'called on Cros device.', e)
176        raise
177    bus = dbus.SystemBus(private=private)
178    cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
179    return dbus.Interface(cras_object, 'org.chromium.cras.Control')
180
181
182def get_cras_nodes():
183    """Gets nodes information from Cras.
184
185    @returns: A dict containing information of each node.
186
187    """
188    return get_cras_control_interface().GetNodes()
189
190
191def get_selected_nodes():
192    """Gets selected output nodes and input nodes.
193
194    @returns: A tuple (output_nodes, input_nodes) where each
195              field is a list of selected node IDs returned from Cras DBus API.
196              Note that there may be multiple output/input nodes being selected
197              at the same time.
198
199    """
200    output_nodes = []
201    input_nodes = []
202    nodes = get_cras_nodes()
203    for node in nodes:
204        if node['Active']:
205            if node['IsInput']:
206                input_nodes.append(node['Id'])
207            else:
208                output_nodes.append(node['Id'])
209    return (output_nodes, input_nodes)
210
211
212def set_selected_output_node_volume(volume):
213    """Sets the selected output node volume.
214
215    @param volume: the volume to be set (0-100).
216
217    """
218    selected_output_node_ids, _ = get_selected_nodes()
219    for node_id in selected_output_node_ids:
220        set_node_volume(node_id, volume)
221
222
223def get_active_stream_count():
224    """Gets the number of active streams.
225
226    @returns: The number of active streams.
227
228    """
229    return int(get_cras_control_interface().GetNumberOfActiveStreams())
230
231
232def set_system_mute(is_mute):
233    """Sets the system mute switch.
234
235    @param is_mute: Set True to mute the system playback.
236
237    """
238    get_cras_control_interface().SetOutputMute(is_mute)
239
240
241def set_capture_mute(is_mute):
242    """Sets the capture mute switch.
243
244    @param is_mute: Set True to mute the capture.
245
246    """
247    get_cras_control_interface().SetInputMute(is_mute)
248
249
250def node_type_is_plugged(node_type, nodes_info):
251    """Determine if there is any node of node_type plugged.
252
253    This method is used in has_loopback_dongle in cros_host, where
254    the call is executed on autotest server. Use get_cras_nodes instead if
255    the call can be executed on Cros device.
256
257    Since Cras only reports the plugged node in GetNodes, we can
258    parse the return value to see if there is any node with the given type.
259    For example, if INTERNAL_MIC is of intereset, the pattern we are
260    looking for is:
261
262    dict entry(
263       string "Type"
264       variant             string "INTERNAL_MIC"
265    )
266
267    @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
268    @param nodes_info: A str containing output of command get_nodes_cmd.
269
270    @returns: True if there is any node of node_type plugged. False otherwise.
271
272    """
273    match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
274                      nodes_info)
275    return True if match else False
276
277
278# Cras node types reported from Cras DBus control API.
279CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
280                          'BLUETOOTH', 'UNKNOWN']
281CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
282                         'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
283                         'KEYBOARD_MIC', 'AOKR']
284CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
285
286
287def get_filtered_node_types(callback):
288    """Returns the pair of filtered output node types and input node types.
289
290    @param callback: A callback function which takes a node as input parameter
291                     and filter the node based on its return value.
292
293    @returns: A tuple (output_node_types, input_node_types) where each
294              field is a list of node types defined in CRAS_NODE_TYPES,
295              and their 'attribute_name' is True.
296
297    """
298    output_node_types = []
299    input_node_types = []
300    nodes = get_cras_nodes()
301    for node in nodes:
302        if callback(node):
303            node_type = str(node['Type'])
304            if node_type not in CRAS_NODE_TYPES:
305                raise RuntimeError(
306                        'node type %s is not valid' % node_type)
307            if node['IsInput']:
308                input_node_types.append(node_type)
309            else:
310                output_node_types.append(node_type)
311    return (output_node_types, input_node_types)
312
313
314def get_selected_node_types():
315    """Returns the pair of active output node types and input node types.
316
317    @returns: A tuple (output_node_types, input_node_types) where each
318              field is a list of selected node types defined in CRAS_NODE_TYPES.
319
320    """
321    def is_selected(node):
322        """Checks if a node is selected.
323
324        A node is selected if its Active attribute is True.
325
326        @returns: True is a node is selected, False otherwise.
327
328        """
329        return node['Active']
330
331    return get_filtered_node_types(is_selected)
332
333
334def get_plugged_node_types():
335    """Returns the pair of plugged output node types and input node types.
336
337    @returns: A tuple (output_node_types, input_node_types) where each
338              field is a list of plugged node types defined in CRAS_NODE_TYPES.
339
340    """
341    def is_plugged(node):
342        """Checks if a node is plugged and is not unknown node.
343
344        Cras DBus API only reports plugged node, so every node reported by Cras
345        DBus API is plugged. However, we filter out UNKNOWN node here because
346        the existence of unknown node depends on the number of redundant
347        playback/record audio device created on audio card. Also, the user of
348        Cras will ignore unknown nodes.
349
350        @returns: True if a node is plugged and is not an UNKNOWN node.
351
352        """
353        return node['Type'] != 'UNKNOWN'
354
355    return get_filtered_node_types(is_plugged)
356
357
358def set_selected_node_types(output_node_types, input_node_types):
359    """Sets selected node types.
360
361    @param output_node_types: A list of output node types. None to skip setting.
362    @param input_node_types: A list of input node types. None to skip setting.
363
364    """
365    if len(output_node_types) == 1:
366        set_single_selected_output_node(output_node_types[0])
367    elif output_node_types:
368        set_selected_output_nodes(output_node_types)
369    if len(input_node_types) == 1:
370        set_single_selected_input_node(input_node_types[0])
371    elif input_node_types:
372        set_selected_input_nodes(input_node_types)
373
374
375def set_single_selected_output_node(node_type):
376    """Sets one selected output node.
377
378    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
379    to select one output node.
380
381    @param node_type: A node type.
382
383    """
384    nodes = get_cras_nodes()
385    for node in nodes:
386        if node['IsInput']:
387            continue
388        if node['Type'] == node_type:
389            set_active_output_node(node['Id'])
390
391
392def set_single_selected_input_node(node_type):
393    """Sets one selected input node.
394
395    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
396    to select one input node.
397
398    @param node_type: A node type.
399
400    """
401    nodes = get_cras_nodes()
402    for node in nodes:
403        if not node['IsInput']:
404            continue
405        if node['Type'] == node_type:
406            set_active_input_node(node['Id'])
407
408
409def set_selected_output_nodes(types):
410    """Sets selected output node types.
411
412    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
413    to select one output node. Here we use add/remove active output node
414    to support multiple nodes.
415
416    @param types: A list of output node types.
417
418    """
419    nodes = get_cras_nodes()
420    for node in nodes:
421        if node['IsInput']:
422            continue
423        if node['Type'] in types:
424            add_active_output_node(node['Id'])
425        elif node['Active']:
426            remove_active_output_node(node['Id'])
427
428
429def set_selected_input_nodes(types):
430    """Sets selected input node types.
431
432    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
433    to select one input node. Here we use add/remove active input node
434    to support multiple nodes.
435
436    @param types: A list of input node types.
437
438    """
439    nodes = get_cras_nodes()
440    for node in nodes:
441        if not node['IsInput']:
442            continue
443        if node['Type'] in types:
444            add_active_input_node(node['Id'])
445        elif node['Active']:
446            remove_active_input_node(node['Id'])
447
448
449def set_active_input_node(node_id):
450    """Sets one active input node.
451
452    @param node_id: node id.
453
454    """
455    get_cras_control_interface().SetActiveInputNode(node_id)
456
457
458def set_active_output_node(node_id):
459    """Sets one active output node.
460
461    @param node_id: node id.
462
463    """
464    get_cras_control_interface().SetActiveOutputNode(node_id)
465
466
467def add_active_output_node(node_id):
468    """Adds an active output node.
469
470    @param node_id: node id.
471
472    """
473    get_cras_control_interface().AddActiveOutputNode(node_id)
474
475
476def add_active_input_node(node_id):
477    """Adds an active input node.
478
479    @param node_id: node id.
480
481    """
482    get_cras_control_interface().AddActiveInputNode(node_id)
483
484
485def remove_active_output_node(node_id):
486    """Removes an active output node.
487
488    @param node_id: node id.
489
490    """
491    get_cras_control_interface().RemoveActiveOutputNode(node_id)
492
493
494def remove_active_input_node(node_id):
495    """Removes an active input node.
496
497    @param node_id: node id.
498
499    """
500    get_cras_control_interface().RemoveActiveInputNode(node_id)
501
502
503def get_node_id_from_node_type(node_type, is_input):
504    """Gets node id from node type.
505
506    @param types: A node type defined in CRAS_NODE_TYPES.
507    @param is_input: True if the node is input. False otherwise.
508
509    @returns: A string for node id.
510
511    @raises: CrasUtilsError: if unique node id can not be found.
512
513    """
514    nodes = get_cras_nodes()
515    find_ids = []
516    for node in nodes:
517        if node['Type'] == node_type and node['IsInput'] == is_input:
518            find_ids.append(node['Id'])
519    if len(find_ids) != 1:
520        raise CrasUtilsError(
521                'Can not find unique node id from node type %s' % node_type)
522    return find_ids[0]
523