1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.server.cros.network import attenuator
9from autotest_lib.server.cros.network import attenuator_hosts
10
11from chromite.lib import timeout_util
12
13HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS
14
15
16class AttenuatorController(object):
17    """Represents a minicircuits variable attenuator.
18
19    This device is used to vary the attenuation between a router and a client.
20    This allows us to measure throughput as a function of signal strength and
21    test some roaming situations.  The throughput vs signal strength tests
22    are referred to rate vs range (RvR) tests in places.
23
24    """
25
26    @property
27    def supported_attenuators(self):
28        """@return iterable of int attenuators supported on this host."""
29        return self._fixed_attenuations.keys()
30
31
32    def __init__(self, hostname):
33        """Construct a AttenuatorController.
34
35        @param hostname: Hostname representing minicircuits attenuator.
36
37        """
38        self.hostname = hostname
39        super(AttenuatorController, self).__init__()
40        if hostname not in HOST_TO_FIXED_ATTENUATIONS.keys():
41            raise error.TestError('Unexpected RvR host name %r.' % hostname)
42        self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[hostname]
43        num_atten = len(self.supported_attenuators)
44
45        self._attenuator = attenuator.Attenuator(hostname, num_atten)
46        self.set_variable_attenuation(0)
47
48
49    def _approximate_frequency(self, attenuator_num, freq):
50        """Finds an approximate frequency to freq.
51
52        In case freq is not present in self._fixed_attenuations, we use a value
53        from a nearby channel as an approximation.
54
55        @param attenuator_num: attenuator in question on the remote host.  Each
56                attenuator has a different fixed path loss per frequency.
57        @param freq: int frequency in MHz.
58        @returns int approximate frequency from self._fixed_attenuations.
59
60        """
61        old_offset = None
62        approx_freq = None
63        for defined_freq in self._fixed_attenuations[attenuator_num].keys():
64            new_offset = abs(defined_freq - freq)
65            if old_offset is None or new_offset < old_offset:
66                old_offset = new_offset
67                approx_freq = defined_freq
68
69        logging.debug('Approximating attenuation for frequency %d with '
70                      'constants for frequency %d.', freq, approx_freq)
71        return approx_freq
72
73
74    def close(self):
75        """Close variable attenuator connection."""
76        self._attenuator.close()
77
78
79    def set_total_attenuation(self, atten_db, frequency_mhz,
80                              attenuator_num=None):
81        """Set the total attenuation on one or all attenuators.
82
83        @param atten_db: int level of attenuation in dB.  This must be
84                higher than the fixed attenuation level of the affected
85                attenuators.
86        @param frequency_mhz: int frequency for which to calculate the
87                total attenuation.  The fixed component of attenuation
88                varies with frequency.
89        @param attenuator_num: int attenuator to change, or None to
90                set all variable attenuators.
91
92        """
93        affected_attenuators = self.supported_attenuators
94        if attenuator_num is not None:
95            affected_attenuators = [attenuator_num]
96        for atten in affected_attenuators:
97            freq_to_fixed_loss = self._fixed_attenuations[atten]
98            approx_freq = self._approximate_frequency(atten,
99                                                      frequency_mhz)
100            variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
101            self.set_variable_attenuation(variable_atten_db,
102                                          attenuator_num=atten)
103
104
105    def set_variable_attenuation(self, atten_db, attenuator_num=None):
106        """Set the variable attenuation on one or all attenuators.
107
108        @param atten_db: int non-negative level of attenuation in dB.
109        @param attenuator_num: int attenuator to change, or None to
110                set all variable attenuators.
111
112        """
113        affected_attenuators = self.supported_attenuators
114        if attenuator_num is not None:
115            affected_attenuators = [attenuator_num]
116        for atten in affected_attenuators:
117            try:
118                self._attenuator.set_atten(atten, atten_db)
119                if int(self._attenuator.get_atten(atten)) != atten_db:
120                    raise error.TestError('Attenuation did not set as expected '
121                                          'on attenuator %d' % atten)
122            except error.TestError:
123                self._attenuator.reopen(self.hostname)
124                self._attenuator.set_atten(atten, atten_db)
125                if int(self._attenuator.get_atten(atten)) != atten_db:
126                    raise error.TestError('Attenuation did not set as expected '
127                                          'on attenuator %d' % atten)
128            logging.info('%ddb attenuation set successfully on attenautor %d',
129                         atten_db, atten)
130
131
132    def get_minimal_total_attenuation(self):
133        """Get attenuator's maximum fixed attenuation value.
134
135        This is pulled from the current attenuator's lines and becomes the
136        minimal total attenuation when stepping through attenuation levels.
137
138        @return maximum starting attenuation value
139
140        """
141        max_atten = 0
142        for atten_num in self._fixed_attenuations.iterkeys():
143            atten_values = self._fixed_attenuations[atten_num].values()
144            max_atten = max(max(atten_values), max_atten)
145        return max_atten
146
147
148    def set_signal_level(self, client_context, requested_sig_level,
149            min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
150        """Set wifi signal to desired level by changing attenuation.
151
152        @param client_context: Client context object.
153        @param requested_sig_level: Negative int value in dBm for wifi signal
154                level to be set.
155        @param min_sig_level_allowed: Minimum signal level allowed; this is to
156                ensure that we don't set a signal that is too weak and DUT can
157                not associate.
158        @param tolerance_percent: Percentage to be used to calculate the desired
159                range for the wifi signal level.
160        """
161        atten_db = 0
162        starting_sig_level = client_context.wifi_signal_level
163        if not starting_sig_level:
164            raise error.TestError("No signal detected.")
165        if not (min_sig_level_allowed <= requested_sig_level <=
166                starting_sig_level):
167            raise error.TestError("Requested signal level (%d) is either "
168                                  "higher than current signal level (%r) with "
169                                  "0db attenuation or lower than minimum "
170                                  "signal level (%d) allowed." %
171                                  (requested_sig_level,
172                                  starting_sig_level,
173                                  min_sig_level_allowed))
174
175        try:
176            with timeout_util.Timeout(timeout):
177                while True:
178                    client_context.reassociate(timeout_seconds=1)
179                    current_sig_level = client_context.wifi_signal_level
180                    logging.info("Current signal level %r", current_sig_level)
181                    if not current_sig_level:
182                        raise error.TestError("No signal detected.")
183                    if self.signal_in_range(requested_sig_level,
184                            current_sig_level, tolerance_percent):
185                        logging.info("Signal level set to %r.",
186                                     current_sig_level)
187                        break
188                    if current_sig_level > requested_sig_level:
189                        self.set_variable_attenuation(atten_db)
190                        atten_db +=1
191                    if current_sig_level < requested_sig_level:
192                        self.set_variable_attenuation(atten_db)
193                        atten_db -= 1
194        except (timeout_util.TimeoutError, error.TestError,
195                error.TestFail) as e:
196            raise error.TestError("Not able to set wifi signal to requested "
197                                  "level. \n%s" % e)
198
199
200    def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
201        """Check if wifi signal is within the threshold of requested signal.
202
203        @param req_sig_level: Negative int value in dBm for wifi signal
204                level to be set.
205        @param curr_sig_level: Current wifi signal level seen by the DUT.
206        @param tolerance_percent: Percentage to be used to calculate the desired
207                range for the wifi signal level.
208
209        @returns True if wifi signal is in the desired range.
210        """
211        min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
212        max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
213        if min_sig <= curr_sig_level <= max_sig:
214            return True
215        return False
216