1#!/usr/bin/python2
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11import logging
12import numpy
13import os
14import re
15import subprocess
16import tempfile
17import threading
18import time
19
20from glob import glob
21from autotest_lib.client.bin import test, utils
22from autotest_lib.client.bin.input.input_device import *
23from autotest_lib.client.common_lib import error
24from autotest_lib.client.cros.audio import audio_data
25from autotest_lib.client.cros.audio import cmd_utils
26from autotest_lib.client.cros.audio import cras_utils
27from autotest_lib.client.cros.audio import sox_utils
28from six.moves import range
29
30LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
31
32_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
33
34_DEFAULT_NUM_CHANNELS = 2
35_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
36_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
37_DEFAULT_PLAYBACK_VOLUME = 100
38_DEFAULT_CAPTURE_GAIN = 2500
39_DEFAULT_ALSA_MAX_VOLUME = '100%'
40_DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
41_DEFAULT_VOLUME_LEVEL = 75
42_DEFAULT_MIC_GAIN = 75
43
44# Minimum RMS value to pass when checking recorded file.
45_DEFAULT_SOX_RMS_THRESHOLD = 0.08
46
47_JACK_VALUE_ON_RE = re.compile(r'.*values=on')
48_HP_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Headphone\sJack')
49_MIC_JACK_CONTROL_RE = re.compile(r'numid=(\d+).*Mic\sJack')
50
51_SOX_RMS_AMPLITUDE_RE = re.compile(r'RMS\s+amplitude:\s+(.+)')
52_SOX_ROUGH_FREQ_RE = re.compile(r'Rough\s+frequency:\s+(.+)')
53
54_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
55_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
56_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
57
58# Tools from platform/audiotest
59AUDIOFUNTEST_PATH = 'audiofuntest'
60AUDIOLOOP_PATH = 'looptest'
61LOOPBACK_LATENCY_PATH = 'loopback_latency'
62SOX_PATH = 'sox'
63TEST_TONES_PATH = 'test_tones'
64
65_MINIMUM_NORM = 0.001
66_CORRELATION_INDEX_THRESHOLD = 0.999
67# The minimum difference of estimated frequencies between two sine waves.
68_FREQUENCY_DIFF_THRESHOLD = 20
69# The minimum RMS value of meaningful audio data.
70_MEANINGFUL_RMS_THRESHOLD = 0.001
71
72def set_mixer_controls(mixer_settings={}, card='0'):
73    """Sets all mixer controls listed in the mixer settings on card.
74
75    @param mixer_settings: Mixer settings to set.
76    @param card: Index of audio card to set mixer settings for.
77    """
78    logging.info('Setting mixer control values on %s', card)
79    for item in mixer_settings:
80        logging.info('Setting %s to %s on card %s',
81                     item['name'], item['value'], card)
82        cmd = 'amixer -c %s cset name=%s %s'
83        cmd = cmd % (card, item['name'], item['value'])
84        try:
85            utils.system(cmd)
86        except error.CmdError:
87            # A card is allowed not to support all the controls, so don't
88            # fail the test here if we get an error.
89            logging.info('amixer command failed: %s', cmd)
90
91def set_default_volume_levels():
92    """Sets the default volume and default capture gain through cras_test_client.
93
94    """
95    logging.info('Setting audio levels to their defaults')
96    set_volume_levels(_DEFAULT_VOLUME_LEVEL, _DEFAULT_MIC_GAIN)
97
98def set_volume_levels(volume, capture):
99    """Sets the volume and capture gain through cras_test_client.
100
101    @param volume: The playback volume to set.
102    @param capture: The capture gain to set.
103    """
104    logging.info('Setting volume: %d capture: %d', volume, capture)
105    try:
106        utils.system('/usr/bin/cras_test_client --volume %d' % volume)
107        utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
108        utils.system('/usr/bin/cras_test_client --dump_server_info')
109        utils.system('/usr/bin/cras_test_client --mute 0')
110    except error.CmdError as e:
111        raise error.TestError(
112                '*** Can not tune volume through CRAS. *** (' + str(e) + ')')
113
114    try:
115        utils.system('amixer -c 0 contents')
116    except error.CmdError as e:
117        logging.info('amixer command failed: %s', str(e))
118
119def loopback_latency_check(**args):
120    """Checks loopback latency.
121
122    @param args: additional arguments for loopback_latency.
123
124    @return A tuple containing measured and reported latency in uS.
125        Return None if no audio detected.
126    """
127    noise_threshold = str(args['n']) if 'n' in args else '400'
128
129    cmd = '%s -n %s -c' % (LOOPBACK_LATENCY_PATH, noise_threshold)
130
131    output = utils.system_output(cmd, retain_output=True)
132
133    # Sleep for a short while to make sure device is not busy anymore
134    # after called loopback_latency.
135    time.sleep(.1)
136
137    measured_latency = None
138    reported_latency = None
139    for line in output.split('\n'):
140        match = re.search(_MEASURED_LATENCY_RE, line, re.I)
141        if match:
142            measured_latency = int(match.group(1))
143            continue
144        match = re.search(_REPORTED_LATENCY_RE, line, re.I)
145        if match:
146            reported_latency = int(match.group(1))
147            continue
148        if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
149            return None
150    if measured_latency and reported_latency:
151        return (measured_latency, reported_latency)
152    else:
153        # Should not reach here, just in case.
154        return None
155
156def get_mixer_jack_status(jack_reg_exp):
157    """Gets the mixer jack status.
158
159    @param jack_reg_exp: The regular expression to match jack control name.
160
161    @return None if the control does not exist, return True if jack control
162        is detected plugged, return False otherwise.
163    """
164    output = utils.system_output('amixer -c0 controls', retain_output=True)
165    numid = None
166    for line in output.split('\n'):
167        m = jack_reg_exp.match(line)
168        if m:
169            numid = m.group(1)
170            break
171
172    # Proceed only when matched numid is not empty.
173    if numid:
174        output = utils.system_output('amixer -c0 cget numid=%s' % numid)
175        for line in output.split('\n'):
176            if _JACK_VALUE_ON_RE.match(line):
177                return True
178        return False
179    else:
180        return None
181
182def get_hp_jack_status():
183    """Gets the status of headphone jack."""
184    status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
185    if status is not None:
186        return status
187
188    # When headphone jack is not found in amixer, lookup input devices
189    # instead.
190    #
191    # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
192    # possibly replace the existing check using amixer.
193    for evdev in glob('/dev/input/event*'):
194        device = InputDevice(evdev)
195        if device.is_hp_jack():
196            return device.get_headphone_insert()
197    else:
198        return None
199
200def get_mic_jack_status():
201    """Gets the status of mic jack."""
202    status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
203    if status is not None:
204        return status
205
206    # When mic jack is not found in amixer, lookup input devices instead.
207    for evdev in glob('/dev/input/event*'):
208        device = InputDevice(evdev)
209        if device.is_mic_jack():
210            return device.get_microphone_insert()
211    else:
212        return None
213
214# Functions to test audio palyback.
215def play_sound(duration_seconds=None, audio_file_path=None):
216    """Plays a sound file found at |audio_file_path| for |duration_seconds|.
217
218    If |audio_file_path|=None, plays a default audio file.
219    If |duration_seconds|=None, plays audio file in its entirety.
220
221    @param duration_seconds: Duration to play sound.
222    @param audio_file_path: Path to the audio file.
223    """
224    if not audio_file_path:
225        audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
226    duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
227    utils.system('aplay %s %s' % (duration_arg, audio_file_path))
228
229def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
230                       sample_size=16):
231    """Gets the command args to generate a sine wav to play to odev.
232
233    @param channel: 0 for left, 1 for right; otherwize, mono.
234    @param odev: alsa output device.
235    @param freq: frequency of the generated sine tone.
236    @param duration: duration of the generated sine tone.
237    @param sample_size: output audio sample size. Default to 16.
238    """
239    cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
240               odev, 'synth', str(duration)]
241    if channel == 0:
242        cmdargs += ['sine', str(freq), 'sine', '0']
243    elif channel == 1:
244        cmdargs += ['sine', '0', 'sine', str(freq)]
245    else:
246        cmdargs += ['sine', str(freq)]
247
248    return cmdargs
249
250def play_sine(channel, odev='default', freq=1000, duration=10,
251              sample_size=16):
252    """Generates a sine wave and plays to odev.
253
254    @param channel: 0 for left, 1 for right; otherwize, mono.
255    @param odev: alsa output device.
256    @param freq: frequency of the generated sine tone.
257    @param duration: duration of the generated sine tone.
258    @param sample_size: output audio sample size. Default to 16.
259    """
260    cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
261    utils.system(' '.join(cmdargs))
262
263def get_audio_rms(sox_output):
264    """Gets the audio RMS value from sox stat output
265
266    @param sox_output: Output of sox stat command.
267
268    @return The RMS value parsed from sox stat output.
269    """
270    for rms_line in sox_output.split('\n'):
271        m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
272        if m is not None:
273            return float(m.group(1))
274
275def get_rough_freq(sox_output):
276    """Gets the rough audio frequency from sox stat output
277
278    @param sox_output: Output of sox stat command.
279
280    @return The rough frequency value parsed from sox stat output.
281    """
282    for rms_line in sox_output.split('\n'):
283        m = _SOX_ROUGH_FREQ_RE.match(rms_line)
284        if m is not None:
285            return int(m.group(1))
286
287def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
288    """Checks if the calculated RMS value is expected.
289
290    @param sox_output: The output from sox stat command.
291    @param sox_threshold: The threshold to test RMS value against.
292
293    @raises error.TestError if RMS amplitude can't be parsed.
294    @raises error.TestFail if the RMS amplitude of the recording isn't above
295            the threshold.
296    """
297    rms_val = get_audio_rms(sox_output)
298
299    # In case we don't get a valid RMS value.
300    if rms_val is None:
301        raise error.TestError(
302            'Failed to generate an audio RMS value from playback.')
303
304    logging.info('Got audio RMS value of %f. Minimum pass is %f.',
305                 rms_val, sox_threshold)
306    if rms_val < sox_threshold:
307        raise error.TestFail(
308            'Audio RMS value %f too low. Minimum pass is %f.' %
309            (rms_val, sox_threshold))
310
311def noise_reduce_file(in_file, noise_file, out_file,
312                      sox_format=_DEFAULT_SOX_FORMAT):
313    """Runs the sox command to reduce noise.
314
315    Runs the sox command to noise-reduce in_file using the noise
316    profile from noise_file.
317
318    @param in_file: The file to noise reduce.
319    @param noise_file: The file containing the noise profile.
320        This can be created by recording silence.
321    @param out_file: The file contains the noise reduced sound.
322    @param sox_format: The  sox format to generate sox command.
323    """
324    prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
325               sox_format, noise_file)
326    reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
327            (SOX_PATH, sox_format, in_file, sox_format, out_file))
328    utils.system('%s | %s' % (prof_cmd, reduce_cmd))
329
330def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
331    """Records a sample from the default input device.
332
333    @param tmpfile: The file to record to.
334    @param record_command: The command to record audio.
335    """
336    utils.system('%s %s' % (record_command, tmpfile))
337
338def create_wav_file(wav_dir, prefix=""):
339    """Creates a unique name for wav file.
340
341    The created file name will be preserved in autotest result directory
342    for future analysis.
343
344    @param wav_dir: The directory of created wav file.
345    @param prefix: specified file name prefix.
346    """
347    filename = "%s-%s.wav" % (prefix, time.time())
348    return os.path.join(wav_dir, filename)
349
350def run_in_parallel(*funs):
351    """Runs methods in parallel.
352
353    @param funs: methods to run.
354    """
355    threads = []
356    for f in funs:
357        t = threading.Thread(target=f)
358        t.start()
359        threads.append(t)
360
361    for t in threads:
362        t.join()
363
364def get_channel_sox_stat(
365        input_audio, channel_index, channels=2, bits=16, rate=48000):
366    """Gets the sox stat info of the selected channel in the input audio file.
367
368    @param input_audio: The input audio file to be analyzed.
369    @param channel_index: The index of the channel to be analyzed.
370                          (1 for the first channel).
371    @param channels: The number of channels in the input audio.
372    @param bits: The number of bits of each audio sample.
373    @param rate: The sampling rate.
374    """
375    if channel_index <= 0 or channel_index > channels:
376        raise ValueError('incorrect channel_indexi: %d' % channel_index)
377
378    if channels == 1:
379        return sox_utils.get_stat(
380                input_audio, channels=channels, bits=bits, rate=rate)
381
382    p1 = cmd_utils.popen(
383            sox_utils.extract_channel_cmd(
384                    input_audio, '-', channel_index,
385                    channels=channels, bits=bits, rate=rate),
386            stdout=subprocess.PIPE)
387    p2 = cmd_utils.popen(
388            sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
389            stdin=p1.stdout, stderr=subprocess.PIPE)
390    stat_output = p2.stderr.read()
391    cmd_utils.wait_and_check_returncode(p1, p2)
392    return sox_utils.parse_stat_output(stat_output)
393
394
395def get_rms(input_audio, channels=1, bits=16, rate=48000):
396    """Gets the RMS values of all channels of the input audio.
397
398    @param input_audio: The input audio file to be checked.
399    @param channels: The number of channels in the input audio.
400    @param bits: The number of bits of each audio sample.
401    @param rate: The sampling rate.
402    """
403    stats = [get_channel_sox_stat(
404            input_audio, i + 1, channels=channels, bits=bits,
405            rate=rate) for i in range(channels)]
406
407    logging.info('sox stat: %s', [str(s) for s in stats])
408    return [s.rms for s in stats]
409
410
411def reduce_noise_and_get_rms(
412        input_audio, noise_file, channels=1, bits=16, rate=48000):
413    """Reduces noise in the input audio by the given noise file and then gets
414    the RMS values of all channels of the input audio.
415
416    @param input_audio: The input audio file to be analyzed.
417    @param noise_file: The noise file used to reduce noise in the input audio.
418    @param channels: The number of channels in the input audio.
419    @param bits: The number of bits of each audio sample.
420    @param rate: The sampling rate.
421    """
422    with tempfile.NamedTemporaryFile() as reduced_file:
423        p1 = cmd_utils.popen(
424                sox_utils.noise_profile_cmd(
425                        noise_file, '-', channels=channels, bits=bits,
426                        rate=rate),
427                stdout=subprocess.PIPE)
428        p2 = cmd_utils.popen(
429                sox_utils.noise_reduce_cmd(
430                        input_audio, reduced_file.name, '-',
431                        channels=channels, bits=bits, rate=rate),
432                stdin=p1.stdout)
433        cmd_utils.wait_and_check_returncode(p1, p2)
434        return get_rms(reduced_file.name, channels, bits, rate)
435
436
437def cras_rms_test_setup():
438    """Setups for the cras_rms_tests.
439
440    To make sure the line_out-to-mic_in path is all green.
441    """
442    # TODO(owenlin): Now, the nodes are choosed by chrome.
443    #                We should do it here.
444    cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
445    cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
446
447    cras_utils.set_system_mute(False)
448    cras_utils.set_capture_mute(False)
449
450
451def dump_rms_postmortem(result_dir):
452    """Dumps postmortem for rms tests."""
453    try:
454        dump_audio_diagnostics(
455                os.path.join(result_dir, "audio_diagnostics.txt"))
456    except Exception:
457        logging.exception('Error while generating postmortem report')
458
459
460def dump_audio_diagnostics(file_path=None):
461    """Dumps audio diagnostics results to a file
462
463    Dumps the result of audio_diagnostics to a file. Returns a string
464    containing the result if the file_path is not specified.
465
466    @returns: None if 'file_path' is specified, otherwise, a string containing
467    the audio diagnostic results.
468    """
469    if file_path:
470        with open(file_path, 'w') as f:
471            return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=f)
472
473    return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=subprocess.PIPE)
474
475
476def get_max_cross_correlation(signal_a, signal_b):
477    """Gets max cross-correlation and best time delay of two signals.
478
479    Computes cross-correlation function between two
480    signals and gets the maximum value and time delay.
481    The steps includes:
482      1. Compute cross-correlation function of X and Y and get Cxy.
483         The correlation function Cxy is an array where Cxy[k] is the
484         cross product of X and Y when Y is delayed by k.
485         Refer to manual of numpy.correlate for detail of correlation.
486      2. Find the maximum value C_max and index C_index in Cxy.
487      3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
488      4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
489
490    Max cross-correlation indicates the similarity of X and Y. The value
491    is 1 if X equals Y multiplied by a positive scalar.
492    The value is -1 if X equals Y multiplied by a negative scaler.
493    Any constant level shift will be regarded as distortion and will make
494    max cross-correlation value deviated from 1.
495    C_index is the best time delay of Y that make Y looks similar to X.
496    Refer to http://en.wikipedia.org/wiki/Cross-correlation.
497
498    @param signal_a: A list of numbers which contains the first signal.
499    @param signal_b: A list of numbers which contains the second signal.
500
501    @raises: ValueError if any number in signal_a or signal_b is not a float.
502             ValueError if norm of any array is less than _MINIMUM_NORM.
503
504    @returns: A tuple (correlation index, best delay). If there are more than
505              one best delay, just return the first one.
506    """
507    def check_list_contains_float(numbers):
508        """Checks the elements in a list are all float.
509
510        @param numbers: A list of numbers.
511
512        @raises: ValueError if there is any element which is not a float
513                 in the list.
514        """
515        if any(not isinstance(x, float) for x in numbers):
516            raise ValueError('List contains number which is not a float')
517
518    check_list_contains_float(signal_a)
519    check_list_contains_float(signal_b)
520
521    norm_a = numpy.linalg.norm(signal_a)
522    norm_b = numpy.linalg.norm(signal_b)
523    logging.debug('norm_a: %f', norm_a)
524    logging.debug('norm_b: %f', norm_b)
525    if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
526        raise ValueError('No meaningful data as norm is too small.')
527
528    correlation = numpy.correlate(signal_a, signal_b, 'full')
529    max_correlation = max(correlation)
530    best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
531    if len(best_delays) > 1:
532        logging.warning('There are more than one best delay: %r', best_delays)
533    return max_correlation / (norm_a * norm_b), best_delays[0]
534
535
536def trim_data(data, threshold=0):
537    """Trims a data by removing value that is too small in head and tail.
538
539    Removes elements in head and tail whose absolute value is smaller than
540    or equal to threshold.
541    E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
542    ([0.2, 0.3, 0.2], 2)
543
544    @param data: A list of numbers.
545    @param threshold: The threshold to compare against.
546
547    @returns: A tuple (trimmed_data, end_trimmed_length), where
548              end_trimmed_length is the length of original data being trimmed
549              from the end.
550              Returns ([], None) if there is no valid data.
551    """
552    indice_valid = [
553            i for i, j in enumerate(data) if abs(j) > threshold]
554    if not indice_valid:
555        logging.warning(
556                'There is no element with absolute value greater '
557                'than threshold %f', threshold)
558        return [], None
559    logging.debug('Start and end of indice_valid: %d, %d',
560                  indice_valid[0], indice_valid[-1])
561    end_trimmed_length = len(data) - indice_valid[-1] - 1
562    logging.debug('Trimmed length in the end: %d', end_trimmed_length)
563    return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
564
565
566def get_one_channel_correlation(test_data, golden_data):
567    """Gets max cross-correlation of test_data and golden_data.
568
569    Trims test data and compute the max cross-correlation against golden_data.
570    Signal can be trimmed because those zero values in the head and tail of
571    a signal will not affect correlation computation.
572
573    @param test_data: A list containing the data to compare against golden data.
574    @param golden_data: A list containing the golden data.
575
576    @returns: A tuple (max cross-correlation, best_delay) if data is valid.
577              Otherwise returns (None, None). Refer to docstring of
578              get_max_cross_correlation.
579    """
580    trimmed_test_data, end_trimmed_length = trim_data(test_data)
581
582    def to_float(samples):
583        """Casts elements in the list to float.
584
585      @param samples: A list of numbers.
586
587      @returns: A list of original numbers casted to float.
588      """
589        samples_float = [float(x) for x in samples]
590        return samples_float
591
592    max_cross_correlation, best_delay =  get_max_cross_correlation(
593            to_float(golden_data),
594            to_float(trimmed_test_data))
595
596    # The reason to add back the trimmed length in the end.
597    # E.g.:
598    # golden data:
599    #
600    # |-----------vvvv----------------|  vvvv is the signal of interest.
601    #       a                 b
602    #
603    # test data:
604    #
605    # |---x----vvvv--------x----------------|  x is the place to trim.
606    #   c   d         e            f
607    #
608    # trimmed test data:
609    #
610    # |----vvvv--------|
611    #   d         e
612    #
613    # The first output of cross correlation computation :
614    #
615    #                  |-----------vvvv----------------|
616    #                       a                 b
617    #
618    # |----vvvv--------|
619    #   d         e
620    #
621    # The largest output of cross correlation computation happens at
622    # delay a + e.
623    #
624    #                  |-----------vvvv----------------|
625    #                       a                 b
626    #
627    #                         |----vvvv--------|
628    #                           d         e
629    #
630    # Cross correlation starts computing by aligning the last sample
631    # of the trimmed test data to the first sample of golden data.
632    # The best delay calculated from trimmed test data and golden data
633    # cross correlation is e + a. But the real best delay that should be
634    # identical on two channel should be e + a + f.
635    # So we need to add back the length being trimmed in the end.
636
637    if max_cross_correlation:
638        return max_cross_correlation, best_delay + end_trimmed_length
639    else:
640        return None, None
641
642
643def compare_one_channel_correlation(test_data, golden_data, parameters):
644    """Compares two one-channel data by correlation.
645
646    @param test_data: A list containing the data to compare against golden data.
647    @param golden_data: A list containing the golden data.
648    @param parameters: A dict containing parameters for method.
649
650    @returns: A dict containing:
651              index: The index of similarity where 1 means they are different
652                  only by a positive scale.
653              best_delay: The best delay of test data in relative to golden
654                  data.
655              equal: A bool containing comparing result.
656    """
657    if 'correlation_threshold' in parameters:
658        threshold = parameters['correlation_threshold']
659    else:
660        threshold = _CORRELATION_INDEX_THRESHOLD
661
662    result_dict = dict()
663    max_cross_correlation, best_delay = get_one_channel_correlation(
664            test_data, golden_data)
665    result_dict['index'] = max_cross_correlation
666    result_dict['best_delay'] = best_delay
667    result_dict['equal'] = True if (
668        max_cross_correlation and
669        max_cross_correlation > threshold) else False
670    logging.debug('result_dict: %r', result_dict)
671    return result_dict
672
673
674def compare_data_correlation(golden_data_binary, golden_data_format,
675                             test_data_binary, test_data_format,
676                             channel_map, parameters=None):
677    """Compares two raw data using correlation.
678
679    @param golden_data_binary: The binary containing golden data.
680    @param golden_data_format: The data format of golden data.
681    @param test_data_binary: The binary containing test data.
682    @param test_data_format: The data format of test data.
683    @param channel_map: A list containing channel mapping.
684                        E.g. [1, 0, None, None, None, None, None, None] means
685                        channel 0 of test data should map to channel 1 of
686                        golden data. Channel 1 of test data should map to
687                        channel 0 of golden data. Channel 2 to 7 of test data
688                        should be skipped.
689    @param parameters: A dict containing parameters for method, if needed.
690
691    @raises: NotImplementedError if file type is not raw.
692             NotImplementedError if sampling rates of two data are not the same.
693             error.TestFail if golden data and test data are not equal.
694    """
695    if parameters is None:
696        parameters = dict()
697
698    if (golden_data_format['file_type'] != 'raw' or
699        test_data_format['file_type'] != 'raw'):
700        raise NotImplementedError('Only support raw data in compare_data.')
701    if (golden_data_format['rate'] != test_data_format['rate']):
702        raise NotImplementedError(
703                'Only support comparing data with the same sampling rate')
704    golden_data = audio_data.AudioRawData(
705            binary=golden_data_binary,
706            channel=golden_data_format['channel'],
707            sample_format=golden_data_format['sample_format'])
708    test_data = audio_data.AudioRawData(
709            binary=test_data_binary,
710            channel=test_data_format['channel'],
711            sample_format=test_data_format['sample_format'])
712    compare_results = []
713    for test_channel, golden_channel in enumerate(channel_map):
714        if golden_channel is None:
715            logging.info('Skipped channel %d', test_channel)
716            continue
717        test_data_one_channel = test_data.channel_data[test_channel]
718        golden_data_one_channel = golden_data.channel_data[golden_channel]
719        result_dict = dict(test_channel=test_channel,
720                           golden_channel=golden_channel)
721        result_dict.update(
722                compare_one_channel_correlation(
723                        test_data_one_channel, golden_data_one_channel,
724                        parameters))
725        compare_results.append(result_dict)
726    logging.info('compare_results: %r', compare_results)
727    for result in compare_results:
728        if not result['equal']:
729            error_msg = ('Failed on test channel %d and golden channel %d with '
730                         'index %f') % (
731                                 result['test_channel'],
732                                 result['golden_channel'],
733                                 result['index'])
734            logging.error(error_msg)
735            raise error.TestFail(error_msg)
736    # Also checks best delay are exactly the same.
737    best_delays = set([result['best_delay'] for result in compare_results])
738    if len(best_delays) > 1:
739        error_msg = 'There are more than one best delay: %s' % best_delays
740        logging.error(error_msg)
741        raise error.TestFail(error_msg)
742
743
744def recorded_filesize_check(filesize,
745                            duration,
746                            channels=1,
747                            rate=48000,
748                            bits=16,
749                            tolerant_ratio=0.1):
750    """Checks the recorded file size is correct. The check will pass if the
751    file size is larger than expected and smaller than our tolerant size. We
752    can tolerate size larger than expected as the way cras_test_client stop
753    recording base on duration is not always accurate but should record longer
754    than expected audio.
755
756    @param filesize: Actually file size.
757    @param duration: Expected seconds to record.
758    @param channels: Number of channels to record.
759    @param rate: Recording sample rate.
760    @param bits: The sample bit width.
761    @param tolerant_ratio: The larger than expected file size ratio that we
762    regard as pass.
763
764    @raises: TestFail if the size is less or larger than expect.
765    """
766    expected = duration * channels * (bits // 8) * rate
767    ratio = abs(float(filesize) / expected)
768    if ratio < 1 or ratio > 1 + tolerant_ratio:
769        raise error.TestFail(
770                'File size not correct: %d expect: %d' % (filesize, expected))
771
772
773class _base_rms_test(test.test):
774    """Base class for all rms_test """
775
776    def postprocess(self):
777        super(_base_rms_test, self).postprocess()
778
779        # Sum up the number of failed constraints in each iteration
780        if sum(len(x) for x in self.failed_constraints):
781            dump_audio_diagnostics(test.resultsdir)
782
783
784class chrome_rms_test(_base_rms_test):
785    """Base test class for audio RMS test with Chrome.
786
787    The chrome instance can be accessed by self.chrome.
788    """
789    def warmup(self):
790        super(chrome_rms_test, self).warmup()
791
792        # Not all client of this file using telemetry.
793        # Just do the import here for those who really need it.
794        from autotest_lib.client.common_lib.cros import chrome
795
796        self.chrome = chrome.Chrome(init_network_controller=True)
797
798        # The audio configuration could be changed when we
799        # restart chrome.
800        try:
801            cras_rms_test_setup()
802        except Exception:
803            self.chrome.browser.Close()
804            raise
805
806
807    def cleanup(self, *args):
808        try:
809            self.chrome.close()
810        finally:
811            super(chrome_rms_test, self).cleanup()
812
813class cras_rms_test(_base_rms_test):
814    """Base test class for CRAS audio RMS test."""
815
816    def warmup(self):
817        super(cras_rms_test, self).warmup()
818        # Stop ui to make sure there are not other streams.
819        utils.stop_service('ui', ignore_status=True)
820        cras_rms_test_setup()
821
822    def cleanup(self, *args):
823        # Restart ui.
824        utils.start_service('ui', ignore_status=True)
825
826
827class alsa_rms_test(_base_rms_test):
828    """Base test class for ALSA audio RMS test.
829
830    Note the warmup will take 10 seconds and the device cannot be used before it
831    returns.
832    """
833    def warmup(self):
834        super(alsa_rms_test, self).warmup()
835
836        cras_rms_test_setup()
837        # We need CRAS to initialize the volume and gain.
838        cras_utils.playback(playback_file="/dev/zero", duration=1)
839        # CRAS will release the device after 10 seconds.
840        time.sleep(11)
841