1from multiprocessing import Process
2import time
4from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest
6END_TOKEN = "end"
9class BtInterferenceRSSITest(A2dpBaseTest):
10    """Test that streams audio from Android phone to relay controlled headset
11    over Bluetooth while running command sequences on one or more attenuators.
13    The command sequences are passed in a config parameter "bt_atten_sequences".
15    bt_atten_sequences: a dictionary mapping attenuator device index to a list
16        of action strings. These action strings are specific to this test and
17        take the form "<action>:<param>" where <action> corresponds to a method
18        name for this test class and <param> is the single value arg to pass to
19        that method.
20        Example: {"0": ["set:50", "wait_seconds:2", "set:0", "wait_seconds:2"],
21                  "1": ["set:10", "end"]}
23            The above dictionary would toggle attenuator at 0 to 50 dB for 2 sec
24            and back to 0 dB for 2 sec in a loop, while simultaneously setting
25            attenuator at 1 to 10dB. Passing the END_TOKEN (defined in this
26            module) terminates the sequence and keeps it from looping.
27    """
29    def setup_class(self):
30        super().setup_class()
31        req_params = ["bt_atten_sequences", "RelayDevice", "codec"]
32        opt_params = ["audio_params"]
33        self.unpack_userparams(req_params, opt_params)
35    def set(self, attenuator, value):
36        self.log.debug("Set attenuator %s to %s" % (attenuator.idx, value))
37        attenuator.set_atten(int(value))
39    def wait_seconds(self, attenuator, value):
40        self.log.debug("Attenuator %s wait for %s seconds" %
41                       (attenuator.idx, value))
42        time.sleep(float(value))
44    def atten_sequence_worker(self, attenuator, action_sequence):
45        while True:
46            for action in action_sequence:
47                if action == END_TOKEN:  # Stop the sequence and don't loop
48                    return
49                method, value = action.split(":")
50                getattr(self, method)(attenuator, value)
52    def set_all_attenuators(self, value):
53        for attenuator in self.attenuators:
54            attenuator.set_atten(value)
56    def teardown_class(self):
57        self.set_all_attenuators(0)
58        super().teardown_class()
60    def test_multi_atten_streaming(self):
61        self.set_all_attenuators(0)
62        # Connect phone and headset before setting any attenuators.
63        self.connect_with_retries(retries=5)
64        # Create processes for streaming and attenuating to run in parallel.
65        stream_and_record_proc = Process(target=self.stream_music_on_codec,
66                                         kwargs=self.codec)
67        attenuation_processes = []
68        for channel, action_sequence in self.bt_atten_sequences.items():
69            process = Process(target=self.atten_sequence_worker,
70                              args=(self.attenuators[int(channel)],
71                                    action_sequence))
72            attenuation_processes.append(process)
73        stream_and_record_proc.start()
74        for process in attenuation_processes:
75            process.start()
76        stream_and_record_proc.join()
77        for process in attenuation_processes:
78            process.terminate()