1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides the test utilities for audio tests using chameleon."""
6
7# TODO (cychiang) Move test utilities from chameleon_audio_helpers
8# to this module.
9
10import logging
11import multiprocessing
12import os
13import time
14from contextlib import contextmanager
15
16from autotest_lib.client.common_lib import error
17from autotest_lib.client.cros import constants
18from autotest_lib.client.cros.audio import audio_analysis
19from autotest_lib.client.cros.audio import audio_data
20
21def check_audio_nodes(audio_facade, audio_nodes):
22    """Checks the node selected by Cros device is correct.
23
24    @param audio_facade: A RemoteAudioFacade to access audio functions on
25                         Cros device.
26
27    @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
28                        expected selected output and input nodes.
29
30    @raises: error.TestFail if the nodes selected by Cros device are not expected.
31
32    """
33    curr_out_nodes, curr_in_nodes = audio_facade.get_selected_node_types()
34    out_audio_nodes, in_audio_nodes = audio_nodes
35    if (in_audio_nodes != None and
36        sorted(curr_in_nodes) != sorted(in_audio_nodes)):
37        raise error.TestFail('Wrong input node(s) selected %s '
38                'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
39    if (out_audio_nodes != None and
40        sorted(curr_out_nodes) != sorted(out_audio_nodes)):
41        raise error.TestFail('Wrong output node(s) selected %s '
42                'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
43
44
45def check_plugged_nodes(audio_facade, audio_nodes):
46    """Checks the nodes that are currently plugged on Cros device are correct.
47
48    @param audio_facade: A RemoteAudioFacade to access audio functions on
49                         Cros device.
50
51    @param audio_nodes: A tuple (out_audio_nodes, in_audio_nodes) containing
52                        expected plugged output and input nodes.
53
54    @raises: error.TestFail if the plugged nodes on Cros device are not expected.
55
56    """
57    curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
58    out_audio_nodes, in_audio_nodes = audio_nodes
59    if (in_audio_nodes != None and
60        sorted(curr_in_nodes) != sorted(in_audio_nodes)):
61        raise error.TestFail('Wrong input node(s) plugged %s '
62                'instead %s!' % (str(curr_in_nodes), str(in_audio_nodes)))
63    if (out_audio_nodes != None and
64        sorted(curr_out_nodes) != sorted(out_audio_nodes)):
65        raise error.TestFail('Wrong output node(s) plugged %s '
66                'instead %s!' % (str(curr_out_nodes), str(out_audio_nodes)))
67
68
69def bluetooth_nodes_plugged(audio_facade):
70    """Checks bluetooth nodes are plugged.
71
72    @param audio_facade: A RemoteAudioFacade to access audio functions on
73                         Cros device.
74
75    @raises: error.TestFail if either input or output bluetooth node is
76             not plugged.
77
78    """
79    curr_out_nodes, curr_in_nodes = audio_facade.get_plugged_node_types()
80    return 'BLUETOOTH' in curr_out_nodes and 'BLUETOOTH' in curr_in_nodes
81
82
83def _get_board_name(host):
84    """Gets the board name.
85
86    @param host: The CrosHost object.
87
88    @returns: The board name.
89
90    """
91    return host.get_board().split(':')[1]
92
93
94def correction_plug_unplug_for_audio(host, port):
95    """Plugs/unplugs several times for Cros device to detect audio.
96
97    For issue crbug.com/450101, Exynos HDMI driver has problem recognizing
98    HDMI audio, while display can be detected. Do several plug/unplug and wait
99    as a workaround. Note that port will be in unplugged state in the end if
100    extra plug/unplug is needed.
101
102    @param host: A CrosHost object.
103    @param port: A ChameleonVideoInput object.
104
105    """
106    board = _get_board_name(host)
107    if board in ['peach_pit', 'peach_pi', 'daisy', 'daisy_spring',
108                 'daisy_skate']:
109        logging.info('Need extra plug/unplug on board %s', board)
110        for _ in xrange(3):
111            port.plug()
112            time.sleep(3)
113            port.unplug()
114            time.sleep(3)
115
116
117def has_internal_speaker(host):
118    """Checks if the Cros device has speaker.
119
120    @param host: The CrosHost object.
121
122    @returns: True if Cros device has internal speaker. False otherwise.
123
124    """
125    board_name = _get_board_name(host)
126    if host.get_board_type() == 'CHROMEBOX' and board_name != 'stumpy':
127        logging.info('Board %s does not have speaker.', board_name)
128        return False
129    return True
130
131
132def has_internal_microphone(host):
133    """Checks if the Cros device has internal microphone.
134
135    @param host: The CrosHost object.
136
137    @returns: True if Cros device has internal microphone. False otherwise.
138
139    """
140    board_name = _get_board_name(host)
141    if host.get_board_type() == 'CHROMEBOX':
142        logging.info('Board %s does not have internal microphone.', board_name)
143        return False
144    return True
145
146
147def suspend_resume(host, suspend_time_secs, resume_network_timeout_secs=50):
148    """Performs the suspend/resume on Cros device.
149
150    @param suspend_time_secs: Time in seconds to let Cros device suspend.
151    @resume_network_timeout_secs: Time in seconds to let Cros device resume and
152                                  obtain network.
153    """
154    def action_suspend():
155        """Calls the host method suspend."""
156        host.suspend(suspend_time=suspend_time_secs)
157
158    boot_id = host.get_boot_id()
159    proc = multiprocessing.Process(target=action_suspend)
160    logging.info("Suspending...")
161    proc.daemon = True
162    proc.start()
163    host.test_wait_for_sleep(suspend_time_secs / 3)
164    logging.info("DUT suspended! Waiting to resume...")
165    host.test_wait_for_resume(
166            boot_id, suspend_time_secs + resume_network_timeout_secs)
167    logging.info("DUT resumed!")
168
169
170def dump_cros_audio_logs(host, audio_facade, directory, suffix=''):
171    """Dumps logs for audio debugging from Cros device.
172
173    @param host: The CrosHost object.
174    @param audio_facade: A RemoteAudioFacade to access audio functions on
175                         Cros device.
176    @directory: The directory to dump logs.
177
178    """
179    def get_file_path(name):
180        """Gets file path to dump logs.
181
182        @param name: The file name.
183
184        @returns: The file path with an optional suffix.
185
186        """
187        file_name = '%s.%s' % (name, suffix) if suffix else name
188        file_path = os.path.join(directory, file_name)
189        return file_path
190
191    audio_facade.dump_diagnostics(get_file_path('audio_diagnostics.txt'))
192
193    host.get_file('/var/log/messages', get_file_path('messages'))
194
195    host.get_file(constants.MULTIMEDIA_XMLRPC_SERVER_LOG_FILE,
196                  get_file_path('multimedia_xmlrpc_server.log'))
197
198
199@contextmanager
200def monitor_no_nodes_changed(audio_facade, callback=None):
201    """Context manager to monitor nodes changed signal on Cros device.
202
203    Starts the counter in the beginning. Stops the counter in the end to make
204    sure there is no NodesChanged signal during the try block.
205
206    E.g. with monitor_no_nodes_changed(audio_facade):
207             do something on playback/recording
208
209    @param audio_facade: A RemoteAudioFacade to access audio functions on
210                         Cros device.
211    @param fail_callback: The callback to call before raising TestFail
212                          when there is unexpected NodesChanged signals.
213
214    @raises: error.TestFail if there is NodesChanged signal on
215             Cros device during the context.
216
217    """
218    try:
219        audio_facade.start_counting_signal('NodesChanged')
220        yield
221    finally:
222        count = audio_facade.stop_counting_signal()
223        if count:
224            message = 'Got %d unexpected NodesChanged signal' % count
225            logging.error(message)
226            if callback:
227                callback()
228            raise error.TestFail(message)
229
230
231# The second dominant frequency should have energy less than -26dB of the
232# first dominant frequency in the spectrum.
233DEFAULT_SECOND_PEAK_RATIO = 0.05
234
235# Tolerate more for bluetooth audio using HSP.
236HSP_SECOND_PEAK_RATIO = 0.2
237
238# The deviation of estimated dominant frequency from golden frequency.
239DEFAULT_FREQUENCY_DIFF_THRESHOLD = 5
240
241def check_recorded_frequency(
242        golden_file, recorder,
243        second_peak_ratio=DEFAULT_SECOND_PEAK_RATIO,
244        frequency_diff_threshold=DEFAULT_FREQUENCY_DIFF_THRESHOLD,
245        ignore_frequencies=None, check_anomaly=False):
246    """Checks if the recorded data contains sine tone of golden frequency.
247
248    @param golden_file: An AudioTestData object that serves as golden data.
249    @param recorder: An AudioWidget used in the test to record data.
250    @param second_peak_ratio: The test fails when the second dominant
251                              frequency has coefficient larger than this
252                              ratio of the coefficient of first dominant
253                              frequency.
254    @param frequency_diff_threshold: The maximum difference between estimated
255                                     frequency of test signal and golden
256                                     frequency. This value should be small for
257                                     signal passed through line.
258    @param ignore_frequencies: A list of frequencies to be ignored. The
259                               component in the spectral with frequency too
260                               close to the frequency in the list will be
261                               ignored. The comparison of frequencies uses
262                               frequency_diff_threshold as well.
263    @param check_anomaly: True to check anomaly in the signal.
264
265    @raises error.TestFail if the recorded data does not contain sine tone of
266            golden frequency.
267
268    """
269    data_format = recorder.data_format
270    recorded_data = audio_data.AudioRawData(
271            binary=recorder.get_binary(),
272            channel=data_format['channel'],
273            sample_format=data_format['sample_format'])
274
275    errors = []
276
277    for test_channel, golden_channel in enumerate(recorder.channel_map):
278        if golden_channel is None:
279            logging.info('Skipped channel %d', test_channel)
280            continue
281
282        signal = recorded_data.channel_data[test_channel]
283        saturate_value = audio_data.get_maximum_value_from_sample_format(
284                data_format['sample_format'])
285        normalized_signal = audio_analysis.normalize_signal(
286                signal, saturate_value)
287        spectral = audio_analysis.spectral_analysis(
288                normalized_signal, data_format['rate'])
289
290        if not spectral:
291            errors.append(
292                    'Channel %d: Can not find dominant frequency.' %
293                            test_channel)
294
295        golden_frequency = golden_file.frequencies[golden_channel]
296        logging.debug('Checking channel %s spectral %s against frequency %s',
297                test_channel, spectral, golden_frequency)
298
299        dominant_frequency = spectral[0][0]
300
301        if (abs(dominant_frequency - golden_frequency) >
302            frequency_diff_threshold):
303            errors.append(
304                    'Channel %d: Dominant frequency %s is away from golden %s' %
305                    (test_channel, dominant_frequency, golden_frequency))
306
307        if check_anomaly:
308            detected_anomaly = audio_analysis.anomaly_detection(
309                    signal=normalized_signal,
310                    rate=data_format['rate'],
311                    freq=golden_frequency)
312            if detected_anomaly:
313                errors.append(
314                        'Channel %d: Detect anomaly near these time: %s' %
315                        (test_channel, detected_anomaly))
316            else:
317                logging.info(
318                        'Channel %d: Quality is good as there is no anomaly',
319                        test_channel)
320
321
322        def should_be_ignored(frequency):
323            """Checks if frequency is close to any frequency in ignore list.
324
325            @param frequency: The frequency to be tested.
326
327            @returns: True if the frequency should be ignored. False otherwise.
328
329            """
330            for ignore_frequency in ignore_frequencies:
331                if (abs(frequency - ignore_frequency) <
332                    frequency_diff_threshold):
333                    logging.debug('Ignore frequency: %s', frequency)
334                    return True
335
336        # Filter out the frequencies to be ignored.
337        if ignore_frequencies:
338            spectral = [x for x in spectral if not should_be_ignored(x[0])]
339
340        if len(spectral) > 1:
341            first_coeff = spectral[0][1]
342            second_coeff = spectral[1][1]
343            if second_coeff > first_coeff * second_peak_ratio:
344                errors.append(
345                        'Channel %d: Found large second dominant frequencies: '
346                        '%s' % (test_channel, spectral))
347
348    if errors:
349        raise error.TestFail(', '.join(errors))
350