1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.common_lib.cros.network import ping_runner
9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
10from autotest_lib.server import hosts
11from autotest_lib.server import site_linux_router
12from autotest_lib.server import site_linux_system
13from autotest_lib.server.cros import dnsname_mangler
14from autotest_lib.server.cros.network import attenuator_controller
15from autotest_lib.server.cros.network import wifi_client
16
17class WiFiTestContextManager(object):
18    """A context manager for state used in WiFi autotests.
19
20    Some of the building blocks we use in WiFi tests need to be cleaned up
21    after use.  For instance, we start an XMLRPC server on the client
22    which should be shut down so that the next test can start its instance.
23    It is convenient to manage this setup and teardown through a context
24    manager rather than building it into the test class logic.
25
26    """
27    CMDLINE_ATTEN_ADDR = 'atten_addr'
28    CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture'
29    CMDLINE_CONDUCTIVE_RIG = 'conductive_rig'
30    CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen'
31    CMDLINE_ROUTER_ADDR = 'router_addr'
32    CMDLINE_PACKET_CAPTURES = 'packet_capture'
33    CMDLINE_USE_WPA_CLI = 'use_wpa_cli'
34
35
36    @property
37    def attenuator(self):
38        """@return attenuator object (e.g. a BeagleBone)."""
39        if self._attenuator is None:
40            raise error.TestNAError('No attenuator available in this setup.')
41
42        return self._attenuator
43
44
45    @property
46    def client(self):
47        """@return WiFiClient object abstracting the DUT."""
48        return self._client_proxy
49
50
51    @property
52    def router(self):
53        """@return router object (e.g. a LinuxCrosRouter)."""
54        return self._router
55
56
57    @property
58    def pcap_host(self):
59        """@return Dedicated packet capture host or None."""
60        return self._pcap_host
61
62
63    @property
64    def capture_host(self):
65        """@return Dedicated pcap_host or the router itself."""
66        return self.pcap_host or self.router
67
68
69    def __init__(self, test_name, host, cmdline_args, debug_dir):
70        """Construct a WiFiTestContextManager.
71
72        Optionally can pull addresses of the server address, router address,
73        or router port from cmdline_args.
74
75        @param test_name string descriptive name for this test.
76        @param host host object representing the DUT.
77        @param cmdline_args dict of key, value settings from command line.
78
79        """
80        super(WiFiTestContextManager, self).__init__()
81        self._test_name = test_name
82        self._cmdline_args = cmdline_args.copy()
83        self._client_proxy = wifi_client.WiFiClient(
84                host, debug_dir,
85                self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False))
86        self._attenuator = None
87        self._router = None
88        self._pcap_host = None
89        self._enable_client_packet_captures = False
90        self._enable_packet_captures = False
91        self._packet_capture_snaplen = None
92
93
94    def __enter__(self):
95        self.setup()
96        return self
97
98
99    def __exit__(self, exc_type, exc_value, traceback):
100        self.teardown()
101
102
103    def _get_bool_cmdline_value(self, key, default_value):
104        """Returns a bool value for the given key from the cmdline args.
105
106        @param key string cmdline args key.
107        @param default_value value to return if the key is not specified in the
108               cmdline args.
109
110        @return True/False or default_value if key is not specified in the
111                cmdline args.
112
113        """
114        if key in self._cmdline_args:
115            value = self._cmdline_args[key].lower()
116            if value in ('1', 'true', 'yes', 'y'):
117                return True
118            else:
119                return False
120        else:
121            return default_value
122
123
124    def get_wifi_addr(self, ap_num=0):
125        """Return an IPv4 address pingable by the client on the WiFi subnet.
126
127        @param ap_num int number of AP.  Only used in stumpy cells.
128        @return string IPv4 address.
129
130        """
131        return self.router.local_server_address(ap_num)
132
133
134    def get_wifi_if(self, ap_num=0):
135        """Returns the interface name for the IP address of self.get_wifi_addr.
136
137        @param ap_num int number of AP.  Only used in stumpy cells.
138        @return string interface name "e.g. wlan0".
139
140        """
141        return self.router.get_hostapd_interface(ap_num)
142
143
144    def get_wifi_host(self):
145        """@return host object representing a pingable machine."""
146        return self.router.host
147
148
149    def configure(self, configuration_parameters, multi_interface=None,
150                  is_ibss=None):
151        """Configure a router with the given parameters.
152
153        Configures an AP according to the specified parameters and
154        enables whatever packet captures are appropriate.  Will deconfigure
155        existing APs unless |multi_interface| is specified.
156
157        @param configuration_parameters HostapConfig object.
158        @param multi_interface True iff having multiple configured interfaces
159                is expected for this configure call.
160        @param is_ibss True iff this is an IBSS endpoint.
161
162        """
163        if not self.client.is_frequency_supported(
164                configuration_parameters.frequency):
165            raise error.TestNAError('DUT does not support frequency: %s' %
166                                    configuration_parameters.frequency)
167        configuration_parameters.security_config.install_router_credentials(
168                self.router.host)
169        if is_ibss:
170            if multi_interface:
171                raise error.TestFail('IBSS mode does not support multiple '
172                                     'interfaces.')
173            if not self.client.is_ibss_supported():
174                raise error.TestNAError('DUT does not support IBSS mode')
175            self.router.ibss_configure(configuration_parameters)
176        else:
177            self.router.hostap_configure(configuration_parameters,
178                                         multi_interface=multi_interface)
179        if self._enable_client_packet_captures:
180            self.client.start_capture(configuration_parameters.frequency,
181                                      snaplen=self._packet_capture_snaplen)
182        if self._enable_packet_captures:
183           self.capture_host.start_capture(
184                    configuration_parameters.frequency,
185                    ht_type=configuration_parameters.ht_packet_capture_mode,
186                    snaplen=self._packet_capture_snaplen)
187
188
189    def setup(self):
190        """Construct the state used in a WiFi test."""
191        self._router = site_linux_router.build_router_proxy(
192                test_name=self._test_name,
193                client_hostname=self.client.host.hostname,
194                router_addr=self._cmdline_args.get(self.CMDLINE_ROUTER_ADDR,
195                                                   None))
196        ping_helper = ping_runner.PingRunner()
197        pcap_addr = dnsname_mangler.get_pcap_addr(
198                self.client.host.hostname,
199                allow_failure=True)
200        if pcap_addr and ping_helper.simple_ping(pcap_addr):
201            self._pcap_host = site_linux_system.LinuxSystem(
202                              hosts.create_host(pcap_addr),'pcap')
203        # The attenuator host gives us the ability to attenuate particular
204        # antennas on the router.  Most setups don't have this capability
205        # and most tests do not require it.  We use this for RvR
206        # (network_WiFi_AttenuatedPerf) and some roaming tests.
207        attenuator_addr = dnsname_mangler.get_attenuator_addr(
208                self.client.host.hostname,
209                cmdline_override=self._cmdline_args.get(
210                        self.CMDLINE_ATTEN_ADDR, None),
211                allow_failure=True)
212        if attenuator_addr and ping_helper.simple_ping(attenuator_addr):
213            self._attenuator = attenuator_controller.AttenuatorController(
214                    hosts.SSHHost(attenuator_addr, port=22))
215        # Set up a clean context to conduct WiFi tests in.
216        self.client.shill.init_test_network_state()
217        if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args:
218            self._enable_client_packet_captures = True
219        if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args:
220            self._enable_packet_captures = True
221        if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args:
222            self._packet_capture_snaplen = int(
223                    self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN])
224        self.client.conductive = self._get_bool_cmdline_value(
225                self.CMDLINE_CONDUCTIVE_RIG, None)
226        for system in (self.client, self.router):
227            system.sync_host_times()
228
229
230    def teardown(self):
231        """Teardown the state used in a WiFi test."""
232        logging.debug('Tearing down the test context.')
233        for system in [self._attenuator, self._client_proxy,
234                       self._router, self._pcap_host]:
235            if system is not None:
236                system.close()
237
238
239    def assert_connect_wifi(self, wifi_params, description=None):
240        """Connect to a WiFi network and check for success.
241
242        Connect a DUT to a WiFi network and check that we connect successfully.
243
244        @param wifi_params AssociationParameters describing network to connect.
245        @param description string Additional text for logging messages.
246
247        @returns AssociationResult if successful; None if wifi_params
248                 contains expect_failure; asserts otherwise.
249
250        """
251        if description:
252            connect_name = '%s (%s)' % (wifi_params.ssid, description)
253        else:
254            connect_name = '%s' % wifi_params.ssid
255        logging.info('Connecting to %s.', connect_name)
256        assoc_result = xmlrpc_datatypes.deserialize(
257                self.client.shill.connect_wifi(wifi_params))
258        logging.info('Finished connection attempt to %s with times: '
259                     'discovery=%.2f, association=%.2f, configuration=%.2f.',
260                     connect_name,
261                     assoc_result.discovery_time,
262                     assoc_result.association_time,
263                     assoc_result.configuration_time)
264
265        if assoc_result.success and wifi_params.expect_failure:
266            raise error.TestFail(
267                'Expected connection to %s to fail, but it was successful.' %
268                connect_name)
269
270        if not assoc_result.success and not wifi_params.expect_failure:
271            raise error.TestFail(
272                'Expected connection to %s to succeed, '
273                'but it failed with reason: %s.' % (
274                    connect_name, assoc_result.failure_reason))
275
276        if wifi_params.expect_failure:
277            logging.info('Unable to connect to %s, as intended.',
278                         connect_name)
279            return None
280
281        logging.info('Connected successfully to %s.', connect_name)
282        return assoc_result
283
284
285    def assert_ping_from_dut(self, ping_config=None, ap_num=None):
286        """Ping a host on the WiFi network from the DUT.
287
288        Ping a host reachable on the WiFi network from the DUT, and
289        check that the ping is successful.  The host we ping depends
290        on the test setup, sometimes that host may be the server and
291        sometimes it will be the router itself.  Ping-ability may be
292        used to confirm that a WiFi network is operating correctly.
293
294        @param ping_config optional PingConfig object to override defaults.
295        @param ap_num int which AP to ping if more than one is configured.
296
297        """
298        if ap_num is None:
299            ap_num = 0
300        if ping_config is None:
301            ping_ip = self.router.get_wifi_ip(ap_num=ap_num)
302            ping_config = ping_runner.PingConfig(ping_ip)
303        self.client.ping(ping_config)
304
305
306    def assert_ping_from_server(self, ping_config=None):
307        """Ping the DUT across the WiFi network from the server.
308
309        Check that the ping is mostly successful and fail the test if it
310        is not.
311
312        @param ping_config optional PingConfig object to override defaults.
313
314        """
315        logging.info('Pinging from server.')
316        if ping_config is None:
317            ping_ip = self.client.wifi_ip
318            ping_config = ping_runner.PingConfig(ping_ip)
319        self.router.ping(ping_config)
320
321
322    def wait_for_connection(self, ssid, freq=None, ap_num=None,
323                            timeout_seconds=30):
324        """Verifies a connection to network ssid on frequency freq.
325
326        @param ssid string ssid of the network to check.
327        @param freq int frequency of network to check.
328        @param ap_num int AP to which to connect
329        @param timeout_seconds int number of seconds to wait for
330                connection on the given frequency.
331
332        @returns a named tuple of (state, time)
333        """
334        if ap_num is None:
335            ap_num = 0
336        desired_subnet = self.router.get_wifi_ip_subnet(ap_num)
337        wifi_ip = self.router.get_wifi_ip(ap_num)
338        return self.client.wait_for_connection(
339                ssid, timeout_seconds=timeout_seconds, freq=freq,
340                ping_ip=wifi_ip, desired_subnet=desired_subnet)
341