1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.common_lib.cros.network import ping_runner
9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
10from autotest_lib.server import hosts
11from autotest_lib.server import site_linux_router
12from autotest_lib.server import site_linux_system
13from autotest_lib.server.cros import dnsname_mangler
14from autotest_lib.server.cros.network import attenuator_controller
15from autotest_lib.server.cros.network import wifi_client
16
17class WiFiTestContextManager(object):
18    """A context manager for state used in WiFi autotests.
19
20    Some of the building blocks we use in WiFi tests need to be cleaned up
21    after use.  For instance, we start an XMLRPC server on the client
22    which should be shut down so that the next test can start its instance.
23    It is convenient to manage this setup and teardown through a context
24    manager rather than building it into the test class logic.
25
26    """
27    CMDLINE_ATTEN_ADDR = 'atten_addr'
28    CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture'
29    CMDLINE_CONDUCTIVE_RIG = 'conductive_rig'
30    CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen'
31    CMDLINE_ROUTER_ADDR = 'router_addr'
32    CMDLINE_PACKET_CAPTURES = 'packet_capture'
33    CMDLINE_USE_WPA_CLI = 'use_wpa_cli'
34
35
36    @property
37    def attenuator(self):
38        """@return attenuator object (e.g. a BeagleBone)."""
39        if self._attenuator is None:
40            raise error.TestNAError('No attenuator available in this setup.')
41
42        return self._attenuator
43
44
45    @property
46    def client(self):
47        """@return WiFiClient object abstracting the DUT."""
48        return self._client_proxy
49
50
51    @property
52    def router(self):
53        """@return router object (e.g. a LinuxCrosRouter)."""
54        return self._router
55
56
57    @property
58    def pcap_host(self):
59        """@return Dedicated packet capture host or None."""
60        return self._pcap_host
61
62
63    @property
64    def capture_host(self):
65        """@return Dedicated pcap_host or the router itself."""
66        return self.pcap_host or self.router
67
68
69    def __init__(self, test_name, host, cmdline_args, debug_dir):
70        """Construct a WiFiTestContextManager.
71
72        Optionally can pull addresses of the server address, router address,
73        or router port from cmdline_args.
74
75        @param test_name string descriptive name for this test.
76        @param host host object representing the DUT.
77        @param cmdline_args dict of key, value settings from command line.
78
79        """
80        super(WiFiTestContextManager, self).__init__()
81        self._test_name = test_name
82        self._cmdline_args = cmdline_args.copy()
83        self._client_proxy = wifi_client.WiFiClient(
84                host, debug_dir,
85                self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False))
86        self._attenuator = None
87        self._router = None
88        self._pcap_host = None
89        self._enable_client_packet_captures = False
90        self._enable_packet_captures = False
91        self._packet_capture_snaplen = None
92
93
94    def __enter__(self):
95        self.setup()
96        return self
97
98
99    def __exit__(self, exc_type, exc_value, traceback):
100        self.teardown()
101
102
103    def _get_bool_cmdline_value(self, key, default_value):
104        """Returns a bool value for the given key from the cmdline args.
105
106        @param key string cmdline args key.
107        @param default_value value to return if the key is not specified in the
108               cmdline args.
109
110        @return True/False or default_value if key is not specified in the
111                cmdline args.
112
113        """
114        if key in self._cmdline_args:
115            value = self._cmdline_args[key].lower()
116            if value in ('1', 'true', 'yes', 'y'):
117                return True
118            else:
119                return False
120        else:
121            return default_value
122
123
124    def get_wifi_addr(self, ap_num=0):
125        """Return an IPv4 address pingable by the client on the WiFi subnet.
126
127        @param ap_num int number of AP.  Only used in stumpy cells.
128        @return string IPv4 address.
129
130        """
131        return self.router.local_server_address(ap_num)
132
133
134    def get_wifi_if(self, ap_num=0):
135        """Returns the interface name for the IP address of self.get_wifi_addr.
136
137        @param ap_num int number of AP.  Only used in stumpy cells.
138        @return string interface name "e.g. wlan0".
139
140        """
141        return self.router.get_hostapd_interface(ap_num)
142
143
144    def get_wifi_host(self):
145        """@return host object representing a pingable machine."""
146        return self.router.host
147
148
149    def configure(self, ap_config, multi_interface=None, is_ibss=None):
150        """Configure a router with the given config.
151
152        Configures an AP according to the specified config and
153        enables whatever packet captures are appropriate.  Will deconfigure
154        existing APs unless |multi_interface| is specified.
155
156        @param ap_config HostapConfig object.
157        @param multi_interface True iff having multiple configured interfaces
158                is expected for this configure call.
159        @param is_ibss True iff this is an IBSS endpoint.
160
161        """
162        if not self.client.is_frequency_supported(ap_config.frequency):
163            raise error.TestNAError('DUT does not support frequency: %s' %
164                                    ap_config.frequency)
165        if ap_config.require_vht:
166            self.client.require_capabilities(
167                    [site_linux_system.LinuxSystem.CAPABILITY_VHT])
168        ap_config.security_config.install_router_credentials(self.router.host)
169        if is_ibss:
170            if multi_interface:
171                raise error.TestFail('IBSS mode does not support multiple '
172                                     'interfaces.')
173            if not self.client.is_ibss_supported():
174                raise error.TestNAError('DUT does not support IBSS mode')
175            self.router.ibss_configure(ap_config)
176        else:
177            self.router.hostap_configure(ap_config,
178                                         multi_interface=multi_interface)
179        if self._enable_client_packet_captures:
180            self.client.start_capture(ap_config.frequency,
181                                      snaplen=self._packet_capture_snaplen)
182        if self._enable_packet_captures:
183           self.capture_host.start_capture(ap_config.frequency,
184                    ht_type=ap_config.ht_packet_capture_mode,
185                    snaplen=self._packet_capture_snaplen)
186
187
188    def setup(self):
189        """Construct the state used in a WiFi test."""
190        self._router = site_linux_router.build_router_proxy(
191                test_name=self._test_name,
192                client_hostname=self.client.host.hostname,
193                router_addr=self._cmdline_args.get(self.CMDLINE_ROUTER_ADDR,
194                                                   None))
195        ping_helper = ping_runner.PingRunner()
196        pcap_addr = dnsname_mangler.get_pcap_addr(
197                self.client.host.hostname,
198                allow_failure=True)
199        if pcap_addr and ping_helper.simple_ping(pcap_addr):
200            self._pcap_host = site_linux_system.LinuxSystem(
201                              hosts.create_host(pcap_addr),'pcap')
202        # The attenuator host gives us the ability to attenuate particular
203        # antennas on the router.  Most setups don't have this capability
204        # and most tests do not require it.  We use this for RvR
205        # (network_WiFi_AttenuatedPerf) and some roaming tests.
206        attenuator_addr = dnsname_mangler.get_attenuator_addr(
207                self.client.host.hostname,
208                cmdline_override=self._cmdline_args.get(
209                        self.CMDLINE_ATTEN_ADDR, None),
210                allow_failure=True)
211        if attenuator_addr and ping_helper.simple_ping(attenuator_addr):
212            self._attenuator = attenuator_controller.AttenuatorController(
213                    attenuator_addr)
214        # Set up a clean context to conduct WiFi tests in.
215        self.client.shill.init_test_network_state()
216        if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args:
217            self._enable_client_packet_captures = True
218        if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args:
219            self._enable_packet_captures = True
220        if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args:
221            self._packet_capture_snaplen = int(
222                    self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN])
223        self.client.conductive = self._get_bool_cmdline_value(
224                self.CMDLINE_CONDUCTIVE_RIG, None)
225        for system in (self.client, self.router):
226            system.sync_host_times()
227
228
229    def teardown(self):
230        """Teardown the state used in a WiFi test."""
231        logging.debug('Tearing down the test context.')
232        for system in [self._attenuator, self._client_proxy,
233                       self._router, self._pcap_host]:
234            if system is not None:
235                system.close()
236
237
238    def assert_connect_wifi(self, wifi_params, description=None):
239        """Connect to a WiFi network and check for success.
240
241        Connect a DUT to a WiFi network and check that we connect successfully.
242
243        @param wifi_params AssociationParameters describing network to connect.
244        @param description string Additional text for logging messages.
245
246        @returns AssociationResult if successful; None if wifi_params
247                 contains expect_failure; asserts otherwise.
248
249        """
250        if description:
251            connect_name = '%s (%s)' % (wifi_params.ssid, description)
252        else:
253            connect_name = '%s' % wifi_params.ssid
254        logging.info('Connecting to %s.', connect_name)
255        assoc_result = xmlrpc_datatypes.deserialize(
256                self.client.shill.connect_wifi(wifi_params))
257        logging.info('Finished connection attempt to %s with times: '
258                     'discovery=%.2f, association=%.2f, configuration=%.2f.',
259                     connect_name,
260                     assoc_result.discovery_time,
261                     assoc_result.association_time,
262                     assoc_result.configuration_time)
263
264        if assoc_result.success and wifi_params.expect_failure:
265            raise error.TestFail(
266                'Expected connection to %s to fail, but it was successful.' %
267                connect_name)
268
269        if not assoc_result.success and not wifi_params.expect_failure:
270            raise error.TestFail(
271                'Expected connection to %s to succeed, '
272                'but it failed with reason: %s.' % (
273                    connect_name, assoc_result.failure_reason))
274
275        if wifi_params.expect_failure:
276            logging.info('Unable to connect to %s, as intended.',
277                         connect_name)
278            return None
279
280        logging.info('Connected successfully to %s, signal level: %r.',
281                     connect_name, self.client.wifi_signal_level)
282        return assoc_result
283
284
285    def assert_ping_from_dut(self, ping_config=None, ap_num=None):
286        """Ping a host on the WiFi network from the DUT.
287
288        Ping a host reachable on the WiFi network from the DUT, and
289        check that the ping is successful.  The host we ping depends
290        on the test setup, sometimes that host may be the server and
291        sometimes it will be the router itself.  Ping-ability may be
292        used to confirm that a WiFi network is operating correctly.
293
294        @param ping_config optional PingConfig object to override defaults.
295        @param ap_num int which AP to ping if more than one is configured.
296
297        """
298        if ap_num is None:
299            ap_num = 0
300        if ping_config is None:
301            ping_ip = self.router.get_wifi_ip(ap_num=ap_num)
302            ping_config = ping_runner.PingConfig(ping_ip)
303        self.client.ping(ping_config)
304
305
306    def assert_ping_from_server(self, ping_config=None):
307        """Ping the DUT across the WiFi network from the server.
308
309        Check that the ping is mostly successful and fail the test if it
310        is not.
311
312        @param ping_config optional PingConfig object to override defaults.
313
314        """
315        logging.info('Pinging from server.')
316        if ping_config is None:
317            ping_ip = self.client.wifi_ip
318            ping_config = ping_runner.PingConfig(ping_ip)
319        self.router.ping(ping_config)
320
321
322    def wait_for_connection(self, ssid, freq=None, ap_num=None,
323                            timeout_seconds=30):
324        """Verifies a connection to network ssid on frequency freq.
325
326        @param ssid string ssid of the network to check.
327        @param freq int frequency of network to check.
328        @param ap_num int AP to which to connect
329        @param timeout_seconds int number of seconds to wait for
330                connection on the given frequency.
331
332        @returns a named tuple of (state, time)
333        """
334        if ap_num is None:
335            ap_num = 0
336        desired_subnet = self.router.get_wifi_ip_subnet(ap_num)
337        wifi_ip = self.router.get_wifi_ip(ap_num)
338        return self.client.wait_for_connection(
339                ssid, timeout_seconds=timeout_seconds, freq=freq,
340                ping_ip=wifi_ip, desired_subnet=desired_subnet)
341