1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides cras audio utilities."""
6
7import logging
8import re
9
10from autotest_lib.client.cros.audio import cmd_utils
11
12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
13
14
15class CrasUtilsError(Exception):
16    pass
17
18
19def playback(blocking=True, *args, **kargs):
20    """A helper function to execute the playback_cmd.
21
22    @param blocking: Blocks this call until playback finishes.
23    @param args: args passed to playback_cmd.
24    @param kargs: kargs passed to playback_cmd.
25
26    @returns: The process running the playback command. Note that if the
27              blocking parameter is true, this will return a finished process.
28    """
29    process = cmd_utils.popen(playback_cmd(*args, **kargs))
30    if blocking:
31        cmd_utils.wait_and_check_returncode(process)
32    return process
33
34
35def capture(*args, **kargs):
36    """A helper function to execute the capture_cmd.
37
38    @param args: args passed to capture_cmd.
39    @param kargs: kargs passed to capture_cmd.
40
41    """
42    cmd_utils.execute(capture_cmd(*args, **kargs))
43
44
45def playback_cmd(playback_file, block_size=None, duration=None,
46                 channels=2, rate=48000):
47    """Gets a command to playback a file with given settings.
48
49    @param playback_file: the name of the file to play. '-' indicates to
50                          playback raw audio from the stdin.
51    @param block_size: the number of frames per callback(dictates latency).
52    @param duration: seconds to playback
53    @param channels: number of channels.
54    @param rate: the sampling rate
55
56    @returns: The command args put in a list of strings.
57
58    """
59    args = [_CRAS_TEST_CLIENT]
60    args += ['--playback_file', playback_file]
61    if block_size is not None:
62        args += ['--block_size', str(block_size)]
63    if duration is not None:
64        args += ['--duration', str(duration)]
65    args += ['--num_channels', str(channels)]
66    args += ['--rate', str(rate)]
67    return args
68
69
70def capture_cmd(
71        capture_file, block_size=None, duration=10, channels=1, rate=48000):
72    """Gets a command to capture the audio into the file with given settings.
73
74    @param capture_file: the name of file the audio to be stored in.
75    @param block_size: the number of frames per callback(dictates latency).
76    @param duration: seconds to record. If it is None, duration is not set,
77                     and command will keep capturing audio until it is
78                     terminated.
79    @param channels: number of channels.
80    @param rate: the sampling rate.
81
82    @returns: The command args put in a list of strings.
83
84    """
85    args = [_CRAS_TEST_CLIENT]
86    args += ['--capture_file', capture_file]
87    if block_size is not None:
88        args += ['--block_size', str(block_size)]
89    if duration is not None:
90        args += ['--duration', str(duration)]
91    args += ['--num_channels', str(channels)]
92    args += ['--rate', str(rate)]
93    return args
94
95
96def listen_cmd(
97        capture_file, block_size=None, duration=10, channels=1, rate=48000):
98    """Gets a command to listen on hotword and record audio into the file with
99       given settings.
100
101    @param capture_file: the name of file the audio to be stored in.
102    @param block_size: the number of frames per callback(dictates latency).
103    @param duration: seconds to record. If it is None, duration is not set,
104                     and command will keep capturing audio until it is
105                     terminated.
106    @param channels: number of channels.
107    @param rate: the sampling rate.
108
109    @returns: The command args put in a list of strings.
110
111    """
112    args = [_CRAS_TEST_CLIENT]
113    args += ['--listen_for_hotword', capture_file]
114    if block_size is not None:
115        args += ['--block_size', str(block_size)]
116    if duration is not None:
117        args += ['--duration', str(duration)]
118    args += ['--num_channels', str(channels)]
119    args += ['--rate', str(rate)]
120    return args
121
122
123def loopback(*args, **kargs):
124    """A helper function to execute loopback_cmd.
125
126    @param args: args passed to loopback_cmd.
127    @param kargs: kargs passed to loopback_cmd.
128
129    """
130
131    cmd_utils.execute(loopback_cmd(*args, **kargs))
132
133
134def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
135    """Gets a command to record the loopback.
136
137    @param output_file: The name of the file the loopback to be stored in.
138    @param channels: The number of channels of the recorded audio.
139    @param duration: seconds to record.
140    @param rate: the sampling rate.
141
142    @returns: The command args put in a list of strings.
143
144    """
145    args = [_CRAS_TEST_CLIENT]
146    args += ['--loopback_file', output_file]
147    args += ['--duration_seconds', str(duration)]
148    args += ['--num_channels', str(channels)]
149    args += ['--rate', str(rate)]
150    return args
151
152
153def get_cras_nodes_cmd():
154    """Gets a command to query the nodes from Cras.
155
156    @returns: The command to query nodes information from Cras using dbus-send.
157
158    """
159    return ('dbus-send --system --type=method_call --print-reply '
160            '--dest=org.chromium.cras /org/chromium/cras '
161            'org.chromium.cras.Control.GetNodes')
162
163
164def set_system_volume(volume):
165    """Set the system volume.
166
167    @param volume: the system output vlume to be set(0 - 100).
168
169    """
170    get_cras_control_interface().SetOutputVolume(volume)
171
172
173def set_node_volume(node_id, volume):
174    """Set the volume of the given output node.
175
176    @param node_id: the id of the output node to be set the volume.
177    @param volume: the volume to be set(0-100).
178
179    """
180    get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
181
182
183def set_capture_gain(gain):
184    """Set the system capture gain.
185
186    @param gain the capture gain in db*100 (100 = 1dB)
187
188    """
189    get_cras_control_interface().SetInputGain(gain)
190
191
192def get_cras_control_interface(private=False):
193    """Gets Cras DBus control interface.
194
195    @param private: Set to True to use a new instance for dbus.SystemBus
196                    instead of the shared instance.
197
198    @returns: A dBus.Interface object with Cras Control interface.
199
200    @raises: ImportError if this is not called on Cros device.
201
202    """
203    try:
204        import dbus
205    except ImportError, e:
206        logging.exception(
207                'Can not import dbus: %s. This method should only be '
208                'called on Cros device.', e)
209        raise
210    bus = dbus.SystemBus(private=private)
211    cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
212    return dbus.Interface(cras_object, 'org.chromium.cras.Control')
213
214
215def get_cras_nodes():
216    """Gets nodes information from Cras.
217
218    @returns: A dict containing information of each node.
219
220    """
221    return get_cras_control_interface().GetNodes()
222
223
224def get_selected_nodes():
225    """Gets selected output nodes and input nodes.
226
227    @returns: A tuple (output_nodes, input_nodes) where each
228              field is a list of selected node IDs returned from Cras DBus API.
229              Note that there may be multiple output/input nodes being selected
230              at the same time.
231
232    """
233    output_nodes = []
234    input_nodes = []
235    nodes = get_cras_nodes()
236    for node in nodes:
237        if node['Active']:
238            if node['IsInput']:
239                input_nodes.append(node['Id'])
240            else:
241                output_nodes.append(node['Id'])
242    return (output_nodes, input_nodes)
243
244
245def set_selected_output_node_volume(volume):
246    """Sets the selected output node volume.
247
248    @param volume: the volume to be set (0-100).
249
250    """
251    selected_output_node_ids, _ = get_selected_nodes()
252    for node_id in selected_output_node_ids:
253        set_node_volume(node_id, volume)
254
255
256def get_active_stream_count():
257    """Gets the number of active streams.
258
259    @returns: The number of active streams.
260
261    """
262    return int(get_cras_control_interface().GetNumberOfActiveStreams())
263
264
265def set_system_mute(is_mute):
266    """Sets the system mute switch.
267
268    @param is_mute: Set True to mute the system playback.
269
270    """
271    get_cras_control_interface().SetOutputMute(is_mute)
272
273
274def set_capture_mute(is_mute):
275    """Sets the capture mute switch.
276
277    @param is_mute: Set True to mute the capture.
278
279    """
280    get_cras_control_interface().SetInputMute(is_mute)
281
282
283def node_type_is_plugged(node_type, nodes_info):
284    """Determine if there is any node of node_type plugged.
285
286    This method is used in the AudioLoopbackDongleLabel class, where the
287    call is executed on autotest server. Use get_cras_nodes instead if
288    the call can be executed on Cros device.
289
290    Since Cras only reports the plugged node in GetNodes, we can
291    parse the return value to see if there is any node with the given type.
292    For example, if INTERNAL_MIC is of intereset, the pattern we are
293    looking for is:
294
295    dict entry(
296       string "Type"
297       variant             string "INTERNAL_MIC"
298    )
299
300    @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
301    @param nodes_info: A str containing output of command get_nodes_cmd.
302
303    @returns: True if there is any node of node_type plugged. False otherwise.
304
305    """
306    match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
307                      nodes_info)
308    return True if match else False
309
310
311# Cras node types reported from Cras DBus control API.
312CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
313                          'BLUETOOTH', 'LINEOUT', 'UNKNOWN']
314CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
315                         'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
316                         'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC']
317CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
318
319
320def get_filtered_node_types(callback):
321    """Returns the pair of filtered output node types and input node types.
322
323    @param callback: A callback function which takes a node as input parameter
324                     and filter the node based on its return value.
325
326    @returns: A tuple (output_node_types, input_node_types) where each
327              field is a list of node types defined in CRAS_NODE_TYPES,
328              and their 'attribute_name' is True.
329
330    """
331    output_node_types = []
332    input_node_types = []
333    nodes = get_cras_nodes()
334    for node in nodes:
335        if callback(node):
336            node_type = str(node['Type'])
337            if node_type not in CRAS_NODE_TYPES:
338                raise RuntimeError(
339                        'node type %s is not valid' % node_type)
340            if node['IsInput']:
341                input_node_types.append(node_type)
342            else:
343                output_node_types.append(node_type)
344    return (output_node_types, input_node_types)
345
346
347def get_selected_node_types():
348    """Returns the pair of active output node types and input node types.
349
350    @returns: A tuple (output_node_types, input_node_types) where each
351              field is a list of selected node types defined in CRAS_NODE_TYPES.
352
353    """
354    def is_selected(node):
355        """Checks if a node is selected.
356
357        A node is selected if its Active attribute is True.
358
359        @returns: True is a node is selected, False otherwise.
360
361        """
362        return node['Active']
363
364    return get_filtered_node_types(is_selected)
365
366
367def get_selected_input_device_name():
368    """Returns the device name of the active input node.
369
370    @returns: device name string. E.g. kbl_r5514_5663_max: :0,1
371    """
372    nodes = get_cras_nodes()
373    for node in nodes:
374        if node['Active'] and node['IsInput']:
375            return node['DeviceName']
376    return None
377
378
379def get_selected_output_device_name():
380    """Returns the device name of the active output node.
381
382    @returns: device name string. E.g. mtk-rt5650: :0,0
383    """
384    nodes = get_cras_nodes()
385    for node in nodes:
386        if node['Active'] and not node['IsInput']:
387            return node['DeviceName']
388    return None
389
390
391def get_selected_output_device_type():
392    """Returns the device type of the active output node.
393
394    @returns: device type string. E.g. INTERNAL_SPEAKER
395    """
396    nodes = get_cras_nodes()
397    for node in nodes:
398        if node['Active'] and not node['IsInput']:
399            return node['Type']
400    return None
401
402
403def get_plugged_node_types():
404    """Returns the pair of plugged output node types and input node types.
405
406    @returns: A tuple (output_node_types, input_node_types) where each
407              field is a list of plugged node types defined in CRAS_NODE_TYPES.
408
409    """
410    def is_plugged(node):
411        """Checks if a node is plugged and is not unknown node.
412
413        Cras DBus API only reports plugged node, so every node reported by Cras
414        DBus API is plugged. However, we filter out UNKNOWN node here because
415        the existence of unknown node depends on the number of redundant
416        playback/record audio device created on audio card. Also, the user of
417        Cras will ignore unknown nodes.
418
419        @returns: True if a node is plugged and is not an UNKNOWN node.
420
421        """
422        return node['Type'] != 'UNKNOWN'
423
424    return get_filtered_node_types(is_plugged)
425
426
427def set_selected_node_types(output_node_types, input_node_types):
428    """Sets selected node types.
429
430    @param output_node_types: A list of output node types. None to skip setting.
431    @param input_node_types: A list of input node types. None to skip setting.
432
433    """
434    if len(output_node_types) == 1:
435        set_single_selected_output_node(output_node_types[0])
436    elif output_node_types:
437        set_selected_output_nodes(output_node_types)
438    if len(input_node_types) == 1:
439        set_single_selected_input_node(input_node_types[0])
440    elif input_node_types:
441        set_selected_input_nodes(input_node_types)
442
443
444def set_single_selected_output_node(node_type):
445    """Sets one selected output node.
446
447    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
448    to select one output node.
449
450    @param node_type: A node type.
451
452    """
453    nodes = get_cras_nodes()
454    for node in nodes:
455        if node['IsInput']:
456            continue
457        if node['Type'] == node_type:
458            set_active_output_node(node['Id'])
459
460
461def set_single_selected_input_node(node_type):
462    """Sets one selected input node.
463
464    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
465    to select one input node.
466
467    @param node_type: A node type.
468
469    """
470    nodes = get_cras_nodes()
471    for node in nodes:
472        if not node['IsInput']:
473            continue
474        if node['Type'] == node_type:
475            set_active_input_node(node['Id'])
476
477
478def set_selected_output_nodes(types):
479    """Sets selected output node types.
480
481    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
482    to select one output node. Here we use add/remove active output node
483    to support multiple nodes.
484
485    @param types: A list of output node types.
486
487    """
488    nodes = get_cras_nodes()
489    for node in nodes:
490        if node['IsInput']:
491            continue
492        if node['Type'] in types:
493            add_active_output_node(node['Id'])
494        elif node['Active']:
495            remove_active_output_node(node['Id'])
496
497
498def set_selected_input_nodes(types):
499    """Sets selected input node types.
500
501    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
502    to select one input node. Here we use add/remove active input node
503    to support multiple nodes.
504
505    @param types: A list of input node types.
506
507    """
508    nodes = get_cras_nodes()
509    for node in nodes:
510        if not node['IsInput']:
511            continue
512        if node['Type'] in types:
513            add_active_input_node(node['Id'])
514        elif node['Active']:
515            remove_active_input_node(node['Id'])
516
517
518def set_active_input_node(node_id):
519    """Sets one active input node.
520
521    @param node_id: node id.
522
523    """
524    get_cras_control_interface().SetActiveInputNode(node_id)
525
526
527def set_active_output_node(node_id):
528    """Sets one active output node.
529
530    @param node_id: node id.
531
532    """
533    get_cras_control_interface().SetActiveOutputNode(node_id)
534
535
536def add_active_output_node(node_id):
537    """Adds an active output node.
538
539    @param node_id: node id.
540
541    """
542    get_cras_control_interface().AddActiveOutputNode(node_id)
543
544
545def add_active_input_node(node_id):
546    """Adds an active input node.
547
548    @param node_id: node id.
549
550    """
551    get_cras_control_interface().AddActiveInputNode(node_id)
552
553
554def remove_active_output_node(node_id):
555    """Removes an active output node.
556
557    @param node_id: node id.
558
559    """
560    get_cras_control_interface().RemoveActiveOutputNode(node_id)
561
562
563def remove_active_input_node(node_id):
564    """Removes an active input node.
565
566    @param node_id: node id.
567
568    """
569    get_cras_control_interface().RemoveActiveInputNode(node_id)
570
571
572def get_node_id_from_node_type(node_type, is_input):
573    """Gets node id from node type.
574
575    @param types: A node type defined in CRAS_NODE_TYPES.
576    @param is_input: True if the node is input. False otherwise.
577
578    @returns: A string for node id.
579
580    @raises: CrasUtilsError: if unique node id can not be found.
581
582    """
583    nodes = get_cras_nodes()
584    find_ids = []
585    for node in nodes:
586        if node['Type'] == node_type and node['IsInput'] == is_input:
587            find_ids.append(node['Id'])
588    if len(find_ids) != 1:
589        raise CrasUtilsError(
590                'Can not find unique node id from node type %s' % node_type)
591    return find_ids[0]
592
593def get_active_node_volume():
594    """Returns volume from active node.
595
596    @returns: int for volume
597
598    @raises: CrasUtilsError: if node volume cannot be found.
599    """
600    nodes = get_cras_nodes()
601    for node in nodes:
602        if node['Active'] == 1 and node['IsInput'] == 0:
603            return int(node['NodeVolume'])
604    raise CrasUtilsError('Cannot find active node volume from nodes.')
605