1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7import logging
8import numpy
9import os
10import re
11import tempfile
12import threading
13import time
14
15from glob import glob
16from autotest_lib.client.bin import test, utils
17from autotest_lib.client.bin.input.input_device import *
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.cros.audio import alsa_utils
20from autotest_lib.client.cros.audio import audio_data
21from autotest_lib.client.cros.audio import cmd_utils
22from autotest_lib.client.cros.audio import cras_utils
23from autotest_lib.client.cros.audio import sox_utils
24
25LD_LIBRARY_PATH = 'LD_LIBRARY_PATH'
26
27_AUDIO_DIAGNOSTICS_PATH = '/usr/bin/audio_diagnostics'
28
29_DEFAULT_NUM_CHANNELS = 2
30_DEFAULT_REC_COMMAND = 'arecord -D hw:0,0 -d 10 -f dat'
31_DEFAULT_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
32_DEFAULT_PLAYBACK_VOLUME = 100
33_DEFAULT_CAPTURE_GAIN = 2500
34_DEFAULT_ALSA_MAX_VOLUME = '100%'
35_DEFAULT_ALSA_CAPTURE_GAIN = '25dB'
36
37# Minimum RMS value to pass when checking recorded file.
38_DEFAULT_SOX_RMS_THRESHOLD = 0.08
39
40_JACK_VALUE_ON_RE = re.compile('.*values=on')
41_HP_JACK_CONTROL_RE = re.compile('numid=(\d+).*Headphone\sJack')
42_MIC_JACK_CONTROL_RE = re.compile('numid=(\d+).*Mic\sJack')
43
44_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
45_SOX_ROUGH_FREQ_RE = re.compile('Rough\s+frequency:\s+(.+)')
46
47_AUDIO_NOT_FOUND_RE = r'Audio\snot\sdetected'
48_MEASURED_LATENCY_RE = r'Measured\sLatency:\s(\d+)\suS'
49_REPORTED_LATENCY_RE = r'Reported\sLatency:\s(\d+)\suS'
50
51# Tools from platform/audiotest
52AUDIOFUNTEST_PATH = 'audiofuntest'
53AUDIOLOOP_PATH = 'looptest'
54LOOPBACK_LATENCY_PATH = 'loopback_latency'
55SOX_PATH = 'sox'
56TEST_TONES_PATH = 'test_tones'
57
58_MINIMUM_NORM = 0.001
59_CORRELATION_INDEX_THRESHOLD = 0.999
60# The minimum difference of estimated frequencies between two sine waves.
61_FREQUENCY_DIFF_THRESHOLD = 20
62# The minimum RMS value of meaningful audio data.
63_MEANINGFUL_RMS_THRESHOLD = 0.001
64
65def set_mixer_controls(mixer_settings={}, card='0'):
66    """Sets all mixer controls listed in the mixer settings on card.
67
68    @param mixer_settings: Mixer settings to set.
69    @param card: Index of audio card to set mixer settings for.
70    """
71    logging.info('Setting mixer control values on %s', card)
72    for item in mixer_settings:
73        logging.info('Setting %s to %s on card %s',
74                     item['name'], item['value'], card)
75        cmd = 'amixer -c %s cset name=%s %s'
76        cmd = cmd % (card, item['name'], item['value'])
77        try:
78            utils.system(cmd)
79        except error.CmdError:
80            # A card is allowed not to support all the controls, so don't
81            # fail the test here if we get an error.
82            logging.info('amixer command failed: %s', cmd)
83
84def set_volume_levels(volume, capture):
85    """Sets the volume and capture gain through cras_test_client.
86
87    @param volume: The playback volume to set.
88    @param capture: The capture gain to set.
89    """
90    logging.info('Setting volume level to %d', volume)
91    try:
92        utils.system('/usr/bin/cras_test_client --volume %d' % volume)
93        logging.info('Setting capture gain to %d', capture)
94        utils.system('/usr/bin/cras_test_client --capture_gain %d' % capture)
95        utils.system('/usr/bin/cras_test_client --dump_server_info')
96        utils.system('/usr/bin/cras_test_client --mute 0')
97        utils.system('amixer -c 0 contents')
98    except error.CmdError, e:
99        raise error.TestError(
100                '*** Can not tune volume through CRAS. *** (' + str(e) + ')')
101
102def loopback_latency_check(**args):
103    """Checks loopback latency.
104
105    @param args: additional arguments for loopback_latency.
106
107    @return A tuple containing measured and reported latency in uS.
108        Return None if no audio detected.
109    """
110    noise_threshold = str(args['n']) if args.has_key('n') else '400'
111
112    cmd = '%s -n %s' % (LOOPBACK_LATENCY_PATH, noise_threshold)
113
114    output = utils.system_output(cmd, retain_output=True)
115
116    # Sleep for a short while to make sure device is not busy anymore
117    # after called loopback_latency.
118    time.sleep(.1)
119
120    measured_latency = None
121    reported_latency = None
122    for line in output.split('\n'):
123        match = re.search(_MEASURED_LATENCY_RE, line, re.I)
124        if match:
125            measured_latency = int(match.group(1))
126            continue
127        match = re.search(_REPORTED_LATENCY_RE, line, re.I)
128        if match:
129            reported_latency = int(match.group(1))
130            continue
131        if re.search(_AUDIO_NOT_FOUND_RE, line, re.I):
132            return None
133    if measured_latency and reported_latency:
134        return (measured_latency, reported_latency)
135    else:
136        # Should not reach here, just in case.
137        return None
138
139def get_mixer_jack_status(jack_reg_exp):
140    """Gets the mixer jack status.
141
142    @param jack_reg_exp: The regular expression to match jack control name.
143
144    @return None if the control does not exist, return True if jack control
145        is detected plugged, return False otherwise.
146    """
147    output = utils.system_output('amixer -c0 controls', retain_output=True)
148    numid = None
149    for line in output.split('\n'):
150        m = jack_reg_exp.match(line)
151        if m:
152            numid = m.group(1)
153            break
154
155    # Proceed only when matched numid is not empty.
156    if numid:
157        output = utils.system_output('amixer -c0 cget numid=%s' % numid)
158        for line in output.split('\n'):
159            if _JACK_VALUE_ON_RE.match(line):
160                return True
161        return False
162    else:
163        return None
164
165def get_hp_jack_status():
166    """Gets the status of headphone jack."""
167    status = get_mixer_jack_status(_HP_JACK_CONTROL_RE)
168    if status is not None:
169        return status
170
171    # When headphone jack is not found in amixer, lookup input devices
172    # instead.
173    #
174    # TODO(hychao): Check hp/mic jack status dynamically from evdev. And
175    # possibly replace the existing check using amixer.
176    for evdev in glob('/dev/input/event*'):
177        device = InputDevice(evdev)
178        if device.is_hp_jack():
179            return device.get_headphone_insert()
180    else:
181        return None
182
183def get_mic_jack_status():
184    """Gets the status of mic jack."""
185    status = get_mixer_jack_status(_MIC_JACK_CONTROL_RE)
186    if status is not None:
187        return status
188
189    # When mic jack is not found in amixer, lookup input devices instead.
190    for evdev in glob('/dev/input/event*'):
191        device = InputDevice(evdev)
192        if device.is_mic_jack():
193            return device.get_microphone_insert()
194    else:
195        return None
196
197def log_loopback_dongle_status():
198    """Log the status of the loopback dongle to make sure it is equipped."""
199    dongle_status_ok = True
200
201    # Check Mic Jack
202    mic_jack_status = get_mic_jack_status()
203    logging.info('Mic jack status: %s', mic_jack_status)
204    dongle_status_ok &= bool(mic_jack_status)
205
206    # Check Headphone Jack
207    hp_jack_status = get_hp_jack_status()
208    logging.info('Headphone jack status: %s', hp_jack_status)
209    dongle_status_ok &= bool(hp_jack_status)
210
211    # Use latency check to test if audio can be captured through dongle.
212    # We only want to know the basic function of dongle, so no need to
213    # assert the latency accuracy here.
214    latency = loopback_latency_check(n=4000)
215    if latency:
216        logging.info('Got latency measured %d, reported %d',
217                latency[0], latency[1])
218    else:
219        logging.info('Latency check fail.')
220        dongle_status_ok = False
221
222    logging.info('audio loopback dongle test: %s',
223            'PASS' if dongle_status_ok else 'FAIL')
224
225# Functions to test audio palyback.
226def play_sound(duration_seconds=None, audio_file_path=None):
227    """Plays a sound file found at |audio_file_path| for |duration_seconds|.
228
229    If |audio_file_path|=None, plays a default audio file.
230    If |duration_seconds|=None, plays audio file in its entirety.
231
232    @param duration_seconds: Duration to play sound.
233    @param audio_file_path: Path to the audio file.
234    """
235    if not audio_file_path:
236        audio_file_path = '/usr/local/autotest/cros/audio/sine440.wav'
237    duration_arg = ('-d %d' % duration_seconds) if duration_seconds else ''
238    utils.system('aplay %s %s' % (duration_arg, audio_file_path))
239
240def get_play_sine_args(channel, odev='default', freq=1000, duration=10,
241                       sample_size=16):
242    """Gets the command args to generate a sine wav to play to odev.
243
244    @param channel: 0 for left, 1 for right; otherwize, mono.
245    @param odev: alsa output device.
246    @param freq: frequency of the generated sine tone.
247    @param duration: duration of the generated sine tone.
248    @param sample_size: output audio sample size. Default to 16.
249    """
250    cmdargs = [SOX_PATH, '-b', str(sample_size), '-n', '-t', 'alsa',
251               odev, 'synth', str(duration)]
252    if channel == 0:
253        cmdargs += ['sine', str(freq), 'sine', '0']
254    elif channel == 1:
255        cmdargs += ['sine', '0', 'sine', str(freq)]
256    else:
257        cmdargs += ['sine', str(freq)]
258
259    return cmdargs
260
261def play_sine(channel, odev='default', freq=1000, duration=10,
262              sample_size=16):
263    """Generates a sine wave and plays to odev.
264
265    @param channel: 0 for left, 1 for right; otherwize, mono.
266    @param odev: alsa output device.
267    @param freq: frequency of the generated sine tone.
268    @param duration: duration of the generated sine tone.
269    @param sample_size: output audio sample size. Default to 16.
270    """
271    cmdargs = get_play_sine_args(channel, odev, freq, duration, sample_size)
272    utils.system(' '.join(cmdargs))
273
274# Functions to compose customized sox command, execute it and process the
275# output of sox command.
276def get_sox_mixer_cmd(infile, channel,
277                      num_channels=_DEFAULT_NUM_CHANNELS,
278                      sox_format=_DEFAULT_SOX_FORMAT):
279    """Gets sox mixer command to reduce channel.
280
281    @param infile: Input file name.
282    @param channel: The selected channel to take effect.
283    @param num_channels: The number of total channels to test.
284    @param sox_format: Format to generate sox command.
285    """
286    # Build up a pan value string for the sox command.
287    if channel == 0:
288        pan_values = '1'
289    else:
290        pan_values = '0'
291    for pan_index in range(1, num_channels):
292        if channel == pan_index:
293            pan_values = '%s%s' % (pan_values, ',1')
294        else:
295            pan_values = '%s%s' % (pan_values, ',0')
296
297    return '%s -c 2 %s %s -c 1 %s - mixer %s' % (SOX_PATH,
298            sox_format, infile, sox_format, pan_values)
299
300def sox_stat_output(infile, channel,
301                    num_channels=_DEFAULT_NUM_CHANNELS,
302                    sox_format=_DEFAULT_SOX_FORMAT):
303    """Executes sox stat command.
304
305    @param infile: Input file name.
306    @param channel: The selected channel.
307    @param num_channels: The number of total channels to test.
308    @param sox_format: Format to generate sox command.
309
310    @return The output of sox stat command
311    """
312    sox_mixer_cmd = get_sox_mixer_cmd(infile, channel,
313                                      num_channels, sox_format)
314    stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (SOX_PATH, sox_format)
315    sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
316    return utils.system_output(sox_cmd, retain_output=True)
317
318def get_audio_rms(sox_output):
319    """Gets the audio RMS value from sox stat output
320
321    @param sox_output: Output of sox stat command.
322
323    @return The RMS value parsed from sox stat output.
324    """
325    for rms_line in sox_output.split('\n'):
326        m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
327        if m is not None:
328            return float(m.group(1))
329
330def get_rough_freq(sox_output):
331    """Gets the rough audio frequency from sox stat output
332
333    @param sox_output: Output of sox stat command.
334
335    @return The rough frequency value parsed from sox stat output.
336    """
337    for rms_line in sox_output.split('\n'):
338        m = _SOX_ROUGH_FREQ_RE.match(rms_line)
339        if m is not None:
340            return int(m.group(1))
341
342def check_audio_rms(sox_output, sox_threshold=_DEFAULT_SOX_RMS_THRESHOLD):
343    """Checks if the calculated RMS value is expected.
344
345    @param sox_output: The output from sox stat command.
346    @param sox_threshold: The threshold to test RMS value against.
347
348    @raises error.TestError if RMS amplitude can't be parsed.
349    @raises error.TestFail if the RMS amplitude of the recording isn't above
350            the threshold.
351    """
352    rms_val = get_audio_rms(sox_output)
353
354    # In case we don't get a valid RMS value.
355    if rms_val is None:
356        raise error.TestError(
357            'Failed to generate an audio RMS value from playback.')
358
359    logging.info('Got audio RMS value of %f. Minimum pass is %f.',
360                 rms_val, sox_threshold)
361    if rms_val < sox_threshold:
362        raise error.TestFail(
363            'Audio RMS value %f too low. Minimum pass is %f.' %
364            (rms_val, sox_threshold))
365
366def noise_reduce_file(in_file, noise_file, out_file,
367                      sox_format=_DEFAULT_SOX_FORMAT):
368    """Runs the sox command to reduce noise.
369
370    Runs the sox command to noise-reduce in_file using the noise
371    profile from noise_file.
372
373    @param in_file: The file to noise reduce.
374    @param noise_file: The file containing the noise profile.
375        This can be created by recording silence.
376    @param out_file: The file contains the noise reduced sound.
377    @param sox_format: The  sox format to generate sox command.
378    """
379    prof_cmd = '%s -c 2 %s %s -n noiseprof' % (SOX_PATH,
380               sox_format, noise_file)
381    reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
382            (SOX_PATH, sox_format, in_file, sox_format, out_file))
383    utils.system('%s | %s' % (prof_cmd, reduce_cmd))
384
385def record_sample(tmpfile, record_command=_DEFAULT_REC_COMMAND):
386    """Records a sample from the default input device.
387
388    @param tmpfile: The file to record to.
389    @param record_command: The command to record audio.
390    """
391    utils.system('%s %s' % (record_command, tmpfile))
392
393def create_wav_file(wav_dir, prefix=""):
394    """Creates a unique name for wav file.
395
396    The created file name will be preserved in autotest result directory
397    for future analysis.
398
399    @param wav_dir: The directory of created wav file.
400    @param prefix: specified file name prefix.
401    """
402    filename = "%s-%s.wav" % (prefix, time.time())
403    return os.path.join(wav_dir, filename)
404
405def run_in_parallel(*funs):
406    """Runs methods in parallel.
407
408    @param funs: methods to run.
409    """
410    threads = []
411    for f in funs:
412        t = threading.Thread(target=f)
413        t.start()
414        threads.append(t)
415
416    for t in threads:
417        t.join()
418
419def loopback_test_channels(noise_file_name, wav_dir,
420                           playback_callback=None,
421                           check_recorded_callback=check_audio_rms,
422                           preserve_test_file=True,
423                           num_channels = _DEFAULT_NUM_CHANNELS,
424                           record_callback=record_sample,
425                           mix_callback=None):
426    """Tests loopback on all channels.
427
428    @param noise_file_name: Name of the file contains pre-recorded noise.
429    @param wav_dir: The directory of created wav file.
430    @param playback_callback: The callback to do the playback for
431        one channel.
432    @param record_callback: The callback to do the recording.
433    @param check_recorded_callback: The callback to check recorded file.
434    @param preserve_test_file: Retain the recorded files for future debugging.
435    @param num_channels: The number of total channels to test.
436    @param mix_callback: The callback to do on the one-channel file.
437    """
438    for channel in xrange(num_channels):
439        record_file_name = create_wav_file(wav_dir,
440                                           "record-%d" % channel)
441        functions = [lambda: record_callback(record_file_name)]
442
443        if playback_callback:
444            functions.append(lambda: playback_callback(channel))
445
446        if mix_callback:
447            mix_file_name = create_wav_file(wav_dir, "mix-%d" % channel)
448            functions.append(lambda: mix_callback(mix_file_name))
449
450        run_in_parallel(*functions)
451
452        if mix_callback:
453            sox_output_mix = sox_stat_output(mix_file_name, channel)
454            rms_val_mix = get_audio_rms(sox_output_mix)
455            logging.info('Got mixed audio RMS value of %f.', rms_val_mix)
456
457        sox_output_record = sox_stat_output(record_file_name, channel)
458        rms_val_record = get_audio_rms(sox_output_record)
459        logging.info('Got recorded audio RMS value of %f.', rms_val_record)
460
461        reduced_file_name = create_wav_file(wav_dir,
462                                            "reduced-%d" % channel)
463        noise_reduce_file(record_file_name, noise_file_name,
464                          reduced_file_name)
465
466        sox_output_reduced = sox_stat_output(reduced_file_name, channel)
467
468        if not preserve_test_file:
469            os.unlink(reduced_file_name)
470            os.unlink(record_file_name)
471            if mix_callback:
472                os.unlink(mix_file_name)
473
474        check_recorded_callback(sox_output_reduced)
475
476
477def get_channel_sox_stat(
478        input_audio, channel_index, channels=2, bits=16, rate=48000):
479    """Gets the sox stat info of the selected channel in the input audio file.
480
481    @param input_audio: The input audio file to be analyzed.
482    @param channel_index: The index of the channel to be analyzed.
483                          (1 for the first channel).
484    @param channels: The number of channels in the input audio.
485    @param bits: The number of bits of each audio sample.
486    @param rate: The sampling rate.
487    """
488    if channel_index <= 0 or channel_index > channels:
489        raise ValueError('incorrect channel_indexi: %d' % channel_index)
490
491    if channels == 1:
492        return sox_utils.get_stat(
493                input_audio, channels=channels, bits=bits, rate=rate)
494
495    p1 = cmd_utils.popen(
496            sox_utils.extract_channel_cmd(
497                    input_audio, '-', channel_index,
498                    channels=channels, bits=bits, rate=rate),
499            stdout=cmd_utils.PIPE)
500    p2 = cmd_utils.popen(
501            sox_utils.stat_cmd('-', channels=1, bits=bits, rate=rate),
502            stdin=p1.stdout, stderr=cmd_utils.PIPE)
503    stat_output = p2.stderr.read()
504    cmd_utils.wait_and_check_returncode(p1, p2)
505    return sox_utils.parse_stat_output(stat_output)
506
507
508def get_rms(input_audio, channels=1, bits=16, rate=48000):
509    """Gets the RMS values of all channels of the input audio.
510
511    @param input_audio: The input audio file to be checked.
512    @param channels: The number of channels in the input audio.
513    @param bits: The number of bits of each audio sample.
514    @param rate: The sampling rate.
515    """
516    stats = [get_channel_sox_stat(
517            input_audio, i + 1, channels=channels, bits=bits,
518            rate=rate) for i in xrange(channels)]
519
520    logging.info('sox stat: %s', [str(s) for s in stats])
521    return [s.rms for s in stats]
522
523
524def reduce_noise_and_get_rms(
525        input_audio, noise_file, channels=1, bits=16, rate=48000):
526    """Reduces noise in the input audio by the given noise file and then gets
527    the RMS values of all channels of the input audio.
528
529    @param input_audio: The input audio file to be analyzed.
530    @param noise_file: The noise file used to reduce noise in the input audio.
531    @param channels: The number of channels in the input audio.
532    @param bits: The number of bits of each audio sample.
533    @param rate: The sampling rate.
534    """
535    with tempfile.NamedTemporaryFile() as reduced_file:
536        p1 = cmd_utils.popen(
537                sox_utils.noise_profile_cmd(
538                        noise_file, '-', channels=channels, bits=bits,
539                        rate=rate),
540                stdout=cmd_utils.PIPE)
541        p2 = cmd_utils.popen(
542                sox_utils.noise_reduce_cmd(
543                        input_audio, reduced_file.name, '-',
544                        channels=channels, bits=bits, rate=rate),
545                stdin=p1.stdout)
546        cmd_utils.wait_and_check_returncode(p1, p2)
547        return get_rms(reduced_file.name, channels, bits, rate)
548
549
550def skip_devices_to_test(*boards):
551    """Devices to skip due to hardware or test compatibility issues.
552
553    @param boards: the boards to skip testing.
554    """
555    # TODO(scottz): Remove this when crbug.com/220147 is fixed.
556    dut_board = utils.get_current_board()
557    if dut_board in boards:
558       raise error.TestNAError('This test is not available on %s' % dut_board)
559
560
561def cras_rms_test_setup():
562    """Setups for the cras_rms_tests.
563
564    To make sure the line_out-to-mic_in path is all green.
565    """
566    # TODO(owenlin): Now, the nodes are choosed by chrome.
567    #                We should do it here.
568    cras_utils.set_system_volume(_DEFAULT_PLAYBACK_VOLUME)
569    cras_utils.set_selected_output_node_volume(_DEFAULT_PLAYBACK_VOLUME)
570
571    cras_utils.set_capture_gain(_DEFAULT_CAPTURE_GAIN)
572
573    cras_utils.set_system_mute(False)
574    cras_utils.set_capture_mute(False)
575
576
577def generate_rms_postmortem():
578    """Generates postmortem for rms tests."""
579    try:
580        logging.info('audio postmortem report')
581        log_loopback_dongle_status()
582        logging.info(get_audio_diagnostics())
583    except Exception:
584        logging.exception('Error while generating postmortem report')
585
586
587def get_audio_diagnostics():
588    """Gets audio diagnostic results.
589
590    @returns: a string containing diagnostic results.
591
592    """
593    return cmd_utils.execute([_AUDIO_DIAGNOSTICS_PATH], stdout=cmd_utils.PIPE)
594
595
596def get_max_cross_correlation(signal_a, signal_b):
597    """Gets max cross-correlation and best time delay of two signals.
598
599    Computes cross-correlation function between two
600    signals and gets the maximum value and time delay.
601    The steps includes:
602      1. Compute cross-correlation function of X and Y and get Cxy.
603         The correlation function Cxy is an array where Cxy[k] is the
604         cross product of X and Y when Y is delayed by k.
605         Refer to manual of numpy.correlate for detail of correlation.
606      2. Find the maximum value C_max and index C_index in Cxy.
607      3. Compute L2 norm of X and Y to get norm(X) and norm(Y).
608      4. Divide C_max by norm(X)*norm(Y) to get max cross-correlation.
609
610    Max cross-correlation indicates the similarity of X and Y. The value
611    is 1 if X equals Y multiplied by a positive scalar.
612    The value is -1 if X equals Y multiplied by a negative scaler.
613    Any constant level shift will be regarded as distortion and will make
614    max cross-correlation value deviated from 1.
615    C_index is the best time delay of Y that make Y looks similar to X.
616    Refer to http://en.wikipedia.org/wiki/Cross-correlation.
617
618    @param signal_a: A list of numbers which contains the first signal.
619    @param signal_b: A list of numbers which contains the second signal.
620
621    @raises: ValueError if any number in signal_a or signal_b is not a float.
622             ValueError if norm of any array is less than _MINIMUM_NORM.
623
624    @returns: A tuple (correlation index, best delay). If there are more than
625              one best delay, just return the first one.
626    """
627    def check_list_contains_float(numbers):
628        """Checks the elements in a list are all float.
629
630        @param numbers: A list of numbers.
631
632        @raises: ValueError if there is any element which is not a float
633                 in the list.
634        """
635        if any(not isinstance(x, float) for x in numbers):
636            raise ValueError('List contains number which is not a float')
637
638    check_list_contains_float(signal_a)
639    check_list_contains_float(signal_b)
640
641    norm_a = numpy.linalg.norm(signal_a)
642    norm_b = numpy.linalg.norm(signal_b)
643    logging.debug('norm_a: %f', norm_a)
644    logging.debug('norm_b: %f', norm_b)
645    if norm_a <= _MINIMUM_NORM or norm_b <= _MINIMUM_NORM:
646        raise ValueError('No meaningful data as norm is too small.')
647
648    correlation = numpy.correlate(signal_a, signal_b, 'full')
649    max_correlation = max(correlation)
650    best_delays = [i for i, j in enumerate(correlation) if j == max_correlation]
651    if len(best_delays) > 1:
652        logging.warning('There are more than one best delay: %r', best_delays)
653    return max_correlation / (norm_a * norm_b), best_delays[0]
654
655
656def trim_data(data, threshold=0):
657    """Trims a data by removing value that is too small in head and tail.
658
659    Removes elements in head and tail whose absolute value is smaller than
660    or equal to threshold.
661    E.g. trim_data([0.0, 0.1, 0.2, 0.3, 0.2, 0.1, 0.0], 0.2) =
662    ([0.2, 0.3, 0.2], 2)
663
664    @param data: A list of numbers.
665    @param threshold: The threshold to compare against.
666
667    @returns: A tuple (trimmed_data, end_trimmed_length), where
668              end_trimmed_length is the length of original data being trimmed
669              from the end.
670              Returns ([], None) if there is no valid data.
671    """
672    indice_valid = [
673            i for i, j in enumerate(data) if abs(j) > threshold]
674    if not indice_valid:
675        logging.warning(
676                'There is no element with absolute value greater '
677                'than threshold %f', threshold)
678        return [], None
679    logging.debug('Start and end of indice_valid: %d, %d',
680                  indice_valid[0], indice_valid[-1])
681    end_trimmed_length = len(data) - indice_valid[-1] - 1
682    logging.debug('Trimmed length in the end: %d', end_trimmed_length)
683    return (data[indice_valid[0] : indice_valid[-1] + 1], end_trimmed_length)
684
685
686def get_one_channel_correlation(test_data, golden_data):
687    """Gets max cross-correlation of test_data and golden_data.
688
689    Trims test data and compute the max cross-correlation against golden_data.
690    Signal can be trimmed because those zero values in the head and tail of
691    a signal will not affect correlation computation.
692
693    @param test_data: A list containing the data to compare against golden data.
694    @param golden_data: A list containing the golden data.
695
696    @returns: A tuple (max cross-correlation, best_delay) if data is valid.
697              Otherwise returns (None, None). Refer to docstring of
698              get_max_cross_correlation.
699    """
700    trimmed_test_data, end_trimmed_length = trim_data(test_data)
701
702    def to_float(samples):
703      """Casts elements in the list to float.
704
705      @param samples: A list of numbers.
706
707      @returns: A list of original numbers casted to float.
708      """
709      samples_float = [float(x) for x in samples]
710      return samples_float
711
712    max_cross_correlation, best_delay =  get_max_cross_correlation(
713            to_float(golden_data),
714            to_float(trimmed_test_data))
715
716    # The reason to add back the trimmed length in the end.
717    # E.g.:
718    # golden data:
719    #
720    # |-----------vvvv----------------|  vvvv is the signal of interest.
721    #       a                 b
722    #
723    # test data:
724    #
725    # |---x----vvvv--------x----------------|  x is the place to trim.
726    #   c   d         e            f
727    #
728    # trimmed test data:
729    #
730    # |----vvvv--------|
731    #   d         e
732    #
733    # The first output of cross correlation computation :
734    #
735    #                  |-----------vvvv----------------|
736    #                       a                 b
737    #
738    # |----vvvv--------|
739    #   d         e
740    #
741    # The largest output of cross correlation computation happens at
742    # delay a + e.
743    #
744    #                  |-----------vvvv----------------|
745    #                       a                 b
746    #
747    #                         |----vvvv--------|
748    #                           d         e
749    #
750    # Cross correlation starts computing by aligning the last sample
751    # of the trimmed test data to the first sample of golden data.
752    # The best delay calculated from trimmed test data and golden data
753    # cross correlation is e + a. But the real best delay that should be
754    # identical on two channel should be e + a + f.
755    # So we need to add back the length being trimmed in the end.
756
757    if max_cross_correlation:
758        return max_cross_correlation, best_delay + end_trimmed_length
759    else:
760        return None, None
761
762
763def compare_one_channel_correlation(test_data, golden_data, parameters):
764    """Compares two one-channel data by correlation.
765
766    @param test_data: A list containing the data to compare against golden data.
767    @param golden_data: A list containing the golden data.
768    @param parameters: A dict containing parameters for method.
769
770    @returns: A dict containing:
771              index: The index of similarity where 1 means they are different
772                  only by a positive scale.
773              best_delay: The best delay of test data in relative to golden
774                  data.
775              equal: A bool containing comparing result.
776    """
777    if 'correlation_threshold' in parameters:
778        threshold = parameters['correlation_threshold']
779    else:
780        threshold = _CORRELATION_INDEX_THRESHOLD
781
782    result_dict = dict()
783    max_cross_correlation, best_delay = get_one_channel_correlation(
784            test_data, golden_data)
785    result_dict['index'] = max_cross_correlation
786    result_dict['best_delay'] = best_delay
787    result_dict['equal'] = True if (
788        max_cross_correlation and
789        max_cross_correlation > threshold) else False
790    logging.debug('result_dict: %r', result_dict)
791    return result_dict
792
793
794def compare_data_correlation(golden_data_binary, golden_data_format,
795                             test_data_binary, test_data_format,
796                             channel_map, parameters=None):
797    """Compares two raw data using correlation.
798
799    @param golden_data_binary: The binary containing golden data.
800    @param golden_data_format: The data format of golden data.
801    @param test_data_binary: The binary containing test data.
802    @param test_data_format: The data format of test data.
803    @param channel_map: A list containing channel mapping.
804                        E.g. [1, 0, None, None, None, None, None, None] means
805                        channel 0 of test data should map to channel 1 of
806                        golden data. Channel 1 of test data should map to
807                        channel 0 of golden data. Channel 2 to 7 of test data
808                        should be skipped.
809    @param parameters: A dict containing parameters for method, if needed.
810
811    @raises: NotImplementedError if file type is not raw.
812             NotImplementedError if sampling rates of two data are not the same.
813             error.TestFail if golden data and test data are not equal.
814    """
815    if parameters is None:
816        parameters = dict()
817
818    if (golden_data_format['file_type'] != 'raw' or
819        test_data_format['file_type'] != 'raw'):
820        raise NotImplementedError('Only support raw data in compare_data.')
821    if (golden_data_format['rate'] != test_data_format['rate']):
822        raise NotImplementedError(
823                'Only support comparing data with the same sampling rate')
824    golden_data = audio_data.AudioRawData(
825            binary=golden_data_binary,
826            channel=golden_data_format['channel'],
827            sample_format=golden_data_format['sample_format'])
828    test_data = audio_data.AudioRawData(
829            binary=test_data_binary,
830            channel=test_data_format['channel'],
831            sample_format=test_data_format['sample_format'])
832    compare_results = []
833    for test_channel, golden_channel in enumerate(channel_map):
834        if golden_channel is None:
835            logging.info('Skipped channel %d', test_channel)
836            continue
837        test_data_one_channel = test_data.channel_data[test_channel]
838        golden_data_one_channel = golden_data.channel_data[golden_channel]
839        result_dict = dict(test_channel=test_channel,
840                           golden_channel=golden_channel)
841        result_dict.update(
842                compare_one_channel_correlation(
843                        test_data_one_channel, golden_data_one_channel,
844                        parameters))
845        compare_results.append(result_dict)
846    logging.info('compare_results: %r', compare_results)
847    for result in compare_results:
848        if not result['equal']:
849            error_msg = ('Failed on test channel %d and golden channel %d with '
850                         'index %f') % (
851                                 result['test_channel'],
852                                 result['golden_channel'],
853                                 result['index'])
854            logging.error(error_msg)
855            raise error.TestFail(error_msg)
856    # Also checks best delay are exactly the same.
857    best_delays = set([result['best_delay'] for result in compare_results])
858    if len(best_delays) > 1:
859        error_msg = 'There are more than one best delay: %s' % best_delays
860        logging.error(error_msg)
861        raise error.TestFail(error_msg)
862
863
864class _base_rms_test(test.test):
865    """Base class for all rms_test """
866
867    def postprocess(self):
868        super(_base_rms_test, self).postprocess()
869
870        # Sum up the number of failed constraints in each iteration
871        if sum(len(x) for x in self.failed_constraints):
872            generate_rms_postmortem()
873
874
875class chrome_rms_test(_base_rms_test):
876    """Base test class for audio RMS test with Chrome.
877
878    The chrome instance can be accessed by self.chrome.
879    """
880    def warmup(self):
881        skip_devices_to_test('x86-mario')
882        super(chrome_rms_test, self).warmup()
883
884        # Not all client of this file using telemetry.
885        # Just do the import here for those who really need it.
886        from autotest_lib.client.common_lib.cros import chrome
887
888        self.chrome = chrome.Chrome(init_network_controller=True)
889
890        # The audio configuration could be changed when we
891        # restart chrome.
892        try:
893            cras_rms_test_setup()
894        except Exception:
895            self.chrome.browser.Close()
896            raise
897
898
899    def cleanup(self, *args):
900        try:
901            self.chrome.close()
902        finally:
903            super(chrome_rms_test, self).cleanup()
904
905class cras_rms_test(_base_rms_test):
906    """Base test class for CRAS audio RMS test."""
907
908    def warmup(self):
909        skip_devices_to_test('x86-mario')
910        super(cras_rms_test, self).warmup()
911        cras_rms_test_setup()
912
913
914def alsa_rms_test_setup():
915    """Setup for alsa_rms_test.
916
917    Different boards/chipsets have different set of mixer controls.  Even
918    controls that have the same name on different boards might have different
919    capabilities.  The following is a general idea to setup a given class of
920    boards, and some specialized setup for certain boards.
921    """
922    card_id = alsa_utils.get_first_soundcard_with_control('Mic Jack', 'Mic')
923    arch = utils.get_arch()
924    board = utils.get_board()
925    uses_max98090 = os.path.exists('/sys/module/snd_soc_max98090')
926    if board in ['daisy_spring', 'daisy_skate']:
927        # The MIC controls of the boards do not support dB syntax.
928        alsa_utils.mixer_cmd(card_id,
929                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
930        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_MAX_VOLUME)
931        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_MAX_VOLUME)
932    elif arch in ['armv7l', 'aarch64'] or uses_max98090:
933        # ARM platforms or Intel platforms that uses max98090 codec driver.
934        alsa_utils.mixer_cmd(card_id,
935                             'sset Headphone ' + _DEFAULT_ALSA_MAX_VOLUME)
936        alsa_utils.mixer_cmd(card_id, 'sset MIC1 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
937        alsa_utils.mixer_cmd(card_id, 'sset MIC2 ' + _DEFAULT_ALSA_CAPTURE_GAIN)
938    else:
939        # The rest of Intel platforms.
940        alsa_utils.mixer_cmd(card_id, 'sset Master ' + _DEFAULT_ALSA_MAX_VOLUME)
941        alsa_utils.mixer_cmd(card_id,
942                             'sset Capture ' + _DEFAULT_ALSA_CAPTURE_GAIN)
943
944
945class alsa_rms_test(_base_rms_test):
946    """Base test class for ALSA audio RMS test."""
947
948    def warmup(self):
949        skip_devices_to_test('x86-mario')
950        super(alsa_rms_test, self).warmup()
951
952        alsa_rms_test_setup()
953