1from multiprocessing import Process
2import time
3
4from acts_contrib.test_utils.bt.A2dpBaseTest import A2dpBaseTest
5
6END_TOKEN = "end"
7
8
9class BtInterferenceRSSITest(A2dpBaseTest):
10    """Test that streams audio from Android phone to relay controlled headset
11    over Bluetooth while running command sequences on one or more attenuators.
12
13    The command sequences are passed in a config parameter "bt_atten_sequences".
14
15    bt_atten_sequences: a dictionary mapping attenuator device index to a list
16        of action strings. These action strings are specific to this test and
17        take the form "<action>:<param>" where <action> corresponds to a method
18        name for this test class and <param> is the single value arg to pass to
19        that method.
20        Example: {"0": ["set:50", "wait_seconds:2", "set:0", "wait_seconds:2"],
21                  "1": ["set:10", "end"]}
22
23            The above dictionary would toggle attenuator at 0 to 50 dB for 2 sec
24            and back to 0 dB for 2 sec in a loop, while simultaneously setting
25            attenuator at 1 to 10dB. Passing the END_TOKEN (defined in this
26            module) terminates the sequence and keeps it from looping.
27    """
28
29    def setup_class(self):
30        super().setup_class()
31        req_params = ["bt_atten_sequences", "RelayDevice", "codec"]
32        opt_params = ["audio_params"]
33        self.unpack_userparams(req_params, opt_params)
34
35    def set(self, attenuator, value):
36        self.log.debug("Set attenuator %s to %s" % (attenuator.idx, value))
37        attenuator.set_atten(int(value))
38
39    def wait_seconds(self, attenuator, value):
40        self.log.debug("Attenuator %s wait for %s seconds" %
41                       (attenuator.idx, value))
42        time.sleep(float(value))
43
44    def atten_sequence_worker(self, attenuator, action_sequence):
45        while True:
46            for action in action_sequence:
47                if action == END_TOKEN:  # Stop the sequence and don't loop
48                    return
49                method, value = action.split(":")
50                getattr(self, method)(attenuator, value)
51
52    def set_all_attenuators(self, value):
53        for attenuator in self.attenuators:
54            attenuator.set_atten(value)
55
56    def teardown_class(self):
57        self.set_all_attenuators(0)
58        super().teardown_class()
59
60    def test_multi_atten_streaming(self):
61        self.set_all_attenuators(0)
62        # Connect phone and headset before setting any attenuators.
63        self.connect_with_retries(retries=5)
64        # Create processes for streaming and attenuating to run in parallel.
65        stream_and_record_proc = Process(target=self.stream_music_on_codec,
66                                         kwargs=self.codec)
67        attenuation_processes = []
68        for channel, action_sequence in self.bt_atten_sequences.items():
69            process = Process(target=self.atten_sequence_worker,
70                              args=(self.attenuators[int(channel)],
71                                    action_sequence))
72            attenuation_processes.append(process)
73        stream_and_record_proc.start()
74        for process in attenuation_processes:
75            process.start()
76        stream_and_record_proc.join()
77        for process in attenuation_processes:
78            process.terminate()