1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.common_lib.cros.network import ping_runner
9from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes
10from autotest_lib.server import hosts
11from autotest_lib.server import site_linux_router
12from autotest_lib.server import site_linux_system
13from autotest_lib.server.cros import dnsname_mangler
14from autotest_lib.server.cros.network import attenuator_controller
15from autotest_lib.server.cros.network import wifi_client
16
17class WiFiTestContextManager(object):
18    """A context manager for state used in WiFi autotests.
19
20    Some of the building blocks we use in WiFi tests need to be cleaned up
21    after use.  For instance, we start an XMLRPC server on the client
22    which should be shut down so that the next test can start its instance.
23    It is convenient to manage this setup and teardown through a context
24    manager rather than building it into the test class logic.
25
26    """
27    CMDLINE_ATTEN_ADDR = 'atten_addr'
28    CMDLINE_CLIENT_PACKET_CAPTURES = 'client_capture'
29    CMDLINE_CONDUCTIVE_RIG = 'conductive_rig'
30    CMDLINE_PACKET_CAPTURE_SNAPLEN = 'capture_snaplen'
31    CMDLINE_ROUTER_ADDR = 'router_addr'
32    CMDLINE_PCAP_ADDR = 'pcap_addr'
33    CMDLINE_PACKET_CAPTURES = 'packet_capture'
34    CMDLINE_USE_WPA_CLI = 'use_wpa_cli'
35
36
37    @property
38    def attenuator(self):
39        """@return attenuator object (e.g. a BeagleBone)."""
40        if self._attenuator is None:
41            raise error.TestNAError('No attenuator available in this setup.')
42
43        return self._attenuator
44
45
46    @property
47    def client(self):
48        """@return WiFiClient object abstracting the DUT."""
49        return self._client_proxy
50
51
52    @property
53    def router(self):
54        """@return router object (e.g. a LinuxCrosRouter)."""
55        return self._router
56
57
58    @property
59    def pcap_host(self):
60        """@return Dedicated packet capture host or None."""
61        return self._pcap_host
62
63
64    @property
65    def capture_host(self):
66        """@return Dedicated pcap_host or the router itself."""
67        return self.pcap_host or self.router
68
69
70    def __init__(self, test_name, host, cmdline_args, debug_dir):
71        """Construct a WiFiTestContextManager.
72
73        Optionally can pull addresses of the server address, router address,
74        or router port from cmdline_args.
75
76        @param test_name string descriptive name for this test.
77        @param host host object representing the DUT.
78        @param cmdline_args dict of key, value settings from command line.
79
80        """
81        super(WiFiTestContextManager, self).__init__()
82        self._test_name = test_name
83        self._cmdline_args = cmdline_args.copy()
84        self._client_proxy = wifi_client.WiFiClient(
85                host, debug_dir,
86                self._get_bool_cmdline_value(self.CMDLINE_USE_WPA_CLI, False))
87        self._attenuator = None
88        self._router = None
89        self._pcap_host = None
90        self._enable_client_packet_captures = False
91        self._enable_packet_captures = False
92        self._packet_capture_snaplen = None
93
94
95    def __enter__(self):
96        self.setup()
97        return self
98
99
100    def __exit__(self, exc_type, exc_value, traceback):
101        self.teardown()
102
103
104    def _get_bool_cmdline_value(self, key, default_value):
105        """Returns a bool value for the given key from the cmdline args.
106
107        @param key string cmdline args key.
108        @param default_value value to return if the key is not specified in the
109               cmdline args.
110
111        @return True/False or default_value if key is not specified in the
112                cmdline args.
113
114        """
115        if key in self._cmdline_args:
116            value = self._cmdline_args[key].lower()
117            if value in ('1', 'true', 'yes', 'y'):
118                return True
119            else:
120                return False
121        else:
122            return default_value
123
124
125    def get_wifi_addr(self, ap_num=0):
126        """Return an IPv4 address pingable by the client on the WiFi subnet.
127
128        @param ap_num int number of AP.  Only used in stumpy cells.
129        @return string IPv4 address.
130
131        """
132        return self.router.local_server_address(ap_num)
133
134
135    def get_wifi_if(self, ap_num=0):
136        """Returns the interface name for the IP address of self.get_wifi_addr.
137
138        @param ap_num int number of AP.  Only used in stumpy cells.
139        @return string interface name "e.g. wlan0".
140
141        """
142        return self.router.get_hostapd_interface(ap_num)
143
144
145    def get_wifi_host(self):
146        """@return host object representing a pingable machine."""
147        return self.router.host
148
149
150    def configure(self, ap_config, multi_interface=None, is_ibss=None,
151                  configure_pcap=False):
152        """Configure a router with the given config.
153
154        Configures an AP according to the specified config and
155        enables whatever packet captures are appropriate.  Will deconfigure
156        existing APs unless |multi_interface| is specified.
157
158        @param ap_config HostapConfig object.
159        @param multi_interface True iff having multiple configured interfaces
160                is expected for this configure call.
161        @param is_ibss True iff this is an IBSS endpoint.
162        @param configure_pcap True iff pcap_host should be configured for this
163                configure call. Raises a TestNAError if |self._pcap_as_router|
164                is False.
165        """
166        if configure_pcap and not self._pcap_as_router:
167            raise error.TestNAError('pcap was not configured as router.')
168        if not self.client.is_frequency_supported(ap_config.frequency):
169            raise error.TestNAError('DUT does not support frequency: %s' %
170                                    ap_config.frequency)
171        if ap_config.require_vht:
172            self.client.require_capabilities(
173                    [site_linux_system.LinuxSystem.CAPABILITY_VHT])
174        router = self.router
175        if configure_pcap:
176            router = self.pcap_host
177        ap_config.security_config.install_router_credentials(router.host,
178                                                             router.logdir)
179        if is_ibss:
180            if multi_interface:
181                raise error.TestFail('IBSS mode does not support multiple '
182                                     'interfaces.')
183            if not self.client.is_ibss_supported():
184                raise error.TestNAError('DUT does not support IBSS mode')
185            router.ibss_configure(ap_config)
186        else:
187            router.hostap_configure(ap_config, multi_interface=multi_interface)
188        if self._enable_client_packet_captures:
189            self.client.start_capture(ap_config.frequency,
190                                      snaplen=self._packet_capture_snaplen)
191        if self._enable_packet_captures:
192           self.capture_host.start_capture(ap_config.frequency,
193                    width_type=ap_config.packet_capture_mode,
194                    snaplen=self._packet_capture_snaplen)
195
196
197    def _setup_router(self):
198        """Set up the router device."""
199        self._router = site_linux_router.build_router_proxy(
200                test_name=self._test_name,
201                client_hostname=self.client.host.hostname,
202                router_addr=self._cmdline_args.get(
203                        self.CMDLINE_ROUTER_ADDR, None))
204        self.router.sync_host_times()
205
206
207    def _setup_pcap(self, pcap_as_router=False):
208        """
209        Set up the pcap device.
210
211        @param pcap_as_router optional bool which should be True if the pcap
212                should be configured as a router.
213        """
214        ping_helper = ping_runner.PingRunner()
215        pcap_addr = dnsname_mangler.get_pcap_addr(
216                self.client.host.hostname,
217                allow_failure=True,
218                cmdline_override=self._cmdline_args.get(self.CMDLINE_PCAP_ADDR,
219                                                        None))
220        if pcap_addr and ping_helper.simple_ping(pcap_addr):
221            if pcap_as_router:
222                self._pcap_host = site_linux_router.LinuxRouter(
223                        hosts.create_host(pcap_addr),
224                        role='pcap',
225                        test_name=self._test_name)
226            else:
227                self._pcap_host = site_linux_system.LinuxSystem(
228                                  hosts.create_host(pcap_addr),'pcap')
229
230
231    def _setup_attenuator(self):
232        """
233        Set up the attenuator device.
234
235        The attenuator host gives us the ability to attenuate particular
236        antennas on the router.  Most setups don't have this capability
237        and most tests do not require it.  We use this for RvR
238        (network_WiFi_AttenuatedPerf) and some roaming tests.
239        """
240        ping_helper = ping_runner.PingRunner()
241        attenuator_addr = dnsname_mangler.get_attenuator_addr(
242                self.client.host.hostname,
243                cmdline_override=self._cmdline_args.get(
244                        self.CMDLINE_ATTEN_ADDR, None),
245                allow_failure=True)
246        if attenuator_addr and ping_helper.simple_ping(attenuator_addr):
247            self._attenuator = attenuator_controller.AttenuatorController(
248                    attenuator_addr)
249
250    def setup(self, include_router=True, include_pcap=True,
251            include_attenuator=True, pcap_as_router=False):
252        """
253        Construct the state used in a WiFi test.
254
255        @param include_router optional bool which should be False if the router
256                is not used by the test
257        @param include_pcap optional bool which should be False if the pcap
258                device is not used by the test
259        @param include_attenuator optional bool which should be False if the
260                attenuator is not used by the test
261        @param pcap_as_router optional bool which should be True if the pcap
262                should be configured as a router.
263        """
264        if include_router:
265            self._setup_router()
266        if include_pcap:
267            self._setup_pcap(pcap_as_router)
268            self._pcap_as_router = pcap_as_router
269        if include_attenuator:
270            self._setup_attenuator()
271
272        # Set up a clean context to conduct WiFi tests in.
273        self.client.shill.init_test_network_state()
274        self.client.sync_host_times()
275
276        if self.CMDLINE_CLIENT_PACKET_CAPTURES in self._cmdline_args:
277            self._enable_client_packet_captures = True
278        if self.CMDLINE_PACKET_CAPTURES in self._cmdline_args:
279            self._enable_packet_captures = True
280        if self.CMDLINE_PACKET_CAPTURE_SNAPLEN in self._cmdline_args:
281            self._packet_capture_snaplen = int(
282                    self._cmdline_args[self.CMDLINE_PACKET_CAPTURE_SNAPLEN])
283        self.client.conductive = self._get_bool_cmdline_value(
284                self.CMDLINE_CONDUCTIVE_RIG, None)
285
286
287    def teardown(self):
288        """Teardown the state used in a WiFi test."""
289        logging.debug('Tearing down the test context.')
290        for system in [self._attenuator, self._client_proxy,
291                       self._router, self._pcap_host]:
292            if system is not None:
293                system.close()
294
295
296    def assert_connect_wifi(self, wifi_params, description=None):
297        """Connect to a WiFi network and check for success.
298
299        Connect a DUT to a WiFi network and check that we connect successfully.
300
301        @param wifi_params AssociationParameters describing network to connect.
302        @param description string Additional text for logging messages.
303
304        @returns AssociationResult if successful; None if wifi_params
305                 contains expect_failure; asserts otherwise.
306
307        """
308        if description:
309            connect_name = '%s (%s)' % (wifi_params.ssid, description)
310        else:
311            connect_name = '%s' % wifi_params.ssid
312        logging.info('Connecting to %s.', connect_name)
313        assoc_result = xmlrpc_datatypes.deserialize(
314                self.client.shill.connect_wifi(wifi_params))
315        logging.info('Finished connection attempt to %s with times: '
316                     'discovery=%.2f, association=%.2f, configuration=%.2f.',
317                     connect_name,
318                     assoc_result.discovery_time,
319                     assoc_result.association_time,
320                     assoc_result.configuration_time)
321
322        if assoc_result.success and wifi_params.expect_failure:
323            raise error.TestFail(
324                'Expected connection to %s to fail, but it was successful.' %
325                connect_name)
326
327        if not assoc_result.success and not wifi_params.expect_failure:
328            raise error.TestFail(
329                'Expected connection to %s to succeed, '
330                'but it failed with reason: %s.' % (
331                    connect_name, assoc_result.failure_reason))
332
333        if wifi_params.expect_failure:
334            logging.info('Unable to connect to %s, as intended.',
335                         connect_name)
336            return None
337
338        logging.info('Connected successfully to %s, signal level: %r.',
339                     connect_name, self.client.wifi_signal_level)
340        return assoc_result
341
342
343    def assert_ping_from_dut(self, ping_config=None, ap_num=None):
344        """Ping a host on the WiFi network from the DUT.
345
346        Ping a host reachable on the WiFi network from the DUT, and
347        check that the ping is successful.  The host we ping depends
348        on the test setup, sometimes that host may be the server and
349        sometimes it will be the router itself.  Ping-ability may be
350        used to confirm that a WiFi network is operating correctly.
351
352        @param ping_config optional PingConfig object to override defaults.
353        @param ap_num int which AP to ping if more than one is configured.
354
355        """
356        if ap_num is None:
357            ap_num = 0
358        if ping_config is None:
359            ping_ip = self.router.get_wifi_ip(ap_num=ap_num)
360            ping_config = ping_runner.PingConfig(ping_ip)
361        self.client.ping(ping_config)
362
363
364    def assert_ping_from_server(self, ping_config=None):
365        """Ping the DUT across the WiFi network from the server.
366
367        Check that the ping is mostly successful and fail the test if it
368        is not.
369
370        @param ping_config optional PingConfig object to override defaults.
371
372        """
373        logging.info('Pinging from server.')
374        if ping_config is None:
375            ping_ip = self.client.wifi_ip
376            ping_config = ping_runner.PingConfig(ping_ip)
377        self.router.ping(ping_config)
378
379
380    def wait_for_connection(self, ssid, freq=None, ap_num=None,
381                            timeout_seconds=30):
382        """Verifies a connection to network ssid on frequency freq.
383
384        @param ssid string ssid of the network to check.
385        @param freq int frequency of network to check.
386        @param ap_num int AP to which to connect
387        @param timeout_seconds int number of seconds to wait for
388                connection on the given frequency.
389
390        @returns a named tuple of (state, time)
391        """
392        if ap_num is None:
393            ap_num = 0
394        desired_subnet = self.router.get_wifi_ip_subnet(ap_num)
395        wifi_ip = self.router.get_wifi_ip(ap_num)
396        return self.client.wait_for_connection(
397                ssid, timeout_seconds=timeout_seconds, freq=freq,
398                ping_ip=wifi_ip, desired_subnet=desired_subnet)
399