1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Base class for DHCP tests.  This class just sets up a little bit of plumbing,
8like a virtual ethernet device with one end that looks like a real ethernet
9device to shill and a DHCP test server on the end that doesn't look like a real
10ethernet interface to shill.  Child classes should override test_body() with the
11logic of their test.  The plumbing of DhcpTestBase is accessible via properties.
12"""
13
14from __future__ import absolute_import
15from __future__ import division
16from __future__ import print_function
17
18import logging
19from six.moves import filter
20from six.moves import range
21import socket
22import struct
23import time
24import traceback
25
26from autotest_lib.client.bin import test
27from autotest_lib.client.common_lib import error
28from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
29from autotest_lib.client.cros import dhcp_handling_rule
30from autotest_lib.client.cros import dhcp_packet
31from autotest_lib.client.cros import dhcp_test_server
32from autotest_lib.client.cros.networking import shill_proxy
33
34
35# These are keys that may be used with the DBus dictionary returned from
36# DhcpTestBase.get_interface_ipconfig().
37DHCPCD_KEY_NAMESERVERS = 'NameServers'
38DHCPCD_KEY_GATEWAY = 'Gateway'
39DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast'
40DHCPCD_KEY_ADDRESS = 'Address'
41DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen'
42DHCPCD_KEY_DOMAIN_NAME = 'DomainName'
43DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname'
44DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
45
46# We should be able to complete a DHCP negotiation in this amount of time.
47DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
48
49# After DHCP completes, an ipconfig should appear shortly after
50IPCONFIG_POLL_COUNT = 5
51IPCONFIG_POLL_PERIOD_SECONDS = 0.5
52
53class DhcpTestBase(test.test):
54    """Parent class for tests that work verify DHCP behavior."""
55    version = 1
56
57    @staticmethod
58    def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix):
59        """
60        Create a new IPv4 address in a subnet by bitwise and'ing an existing
61        address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in
62        |ip_suffix|.  For safety, bitwise or the suffix with the complement of
63        the subnet mask.
64
65        Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105")
66
67        The example usage will return "192.168.1.105".
68
69        @param subnet_mask string subnet mask, e.g. "255.255.255.0"
70        @param ip_in_subnet string an IP address in the desired subnet
71        @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105"
72
73        @return string IP address on in the same subnet with specified suffix.
74
75        """
76        mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0]
77        subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0]
78        suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0]
79        return socket.inet_ntoa(struct.pack('!I', (subnet | suffix)))
80
81
82    def get_device(self, interface_name):
83        """Finds the corresponding Device object for an interface with
84        the name |interface_name|.
85
86        @param interface_name string The name of the interface to check.
87
88        @return DBus interface object representing the associated device.
89
90        """
91        return self.shill_proxy.find_object('Device',
92                                            {'Name': interface_name})
93
94
95    def find_ethernet_service(self, interface_name):
96        """Finds the corresponding service object for an Ethernet interface.
97
98        @param interface_name string The name of the associated interface
99
100        @return Service object representing the associated service.
101
102        """
103        device = self.get_device(interface_name)
104        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
105        return self.shill_proxy.find_object('Service', {'Device': device_path})
106
107
108    def get_interface_ipconfig_objects(self, interface_name):
109        """
110        Returns a list of dbus object proxies for |interface_name|.
111        Returns an empty list if no such interface exists.
112
113        @param interface_name string name of the device to query (e.g., "eth0").
114
115        @return list of objects representing DBus IPConfig RPC endpoints.
116
117        """
118        device = self.get_device(interface_name)
119        if device is None:
120            return []
121
122        device_properties = device.GetProperties(utf8_strings=True)
123        proxy = self.shill_proxy
124
125        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
126        return list(filter(bool,
127                      [ proxy.get_dbus_object(ipconfig_object, property_path)
128                        for property_path in device_properties['IPConfigs'] ]))
129
130
131    def get_interface_ipconfig(self, interface_name):
132        """
133        Returns a dictionary containing settings for an |interface_name| set
134        via DHCP.  Returns None if no such interface or setting bundle on
135        that interface can be found in shill.
136
137        @param interface_name string name of the device to query (e.g., "eth0").
138
139        @return dict containing the the properties of the IPConfig stripped
140            of DBus meta-data or None.
141
142        """
143        dhcp_properties = None
144        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
145          logging.info('Looking at ipconfig %r', ipconfig)
146          ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
147          if 'Method' not in ipconfig_properties:
148              logging.info('Found ipconfig object with no method field')
149              continue
150          if ipconfig_properties['Method'] != 'dhcp':
151              logging.info('Found ipconfig object with method != dhcp')
152              continue
153          if dhcp_properties != None:
154              raise error.TestFail('Found multiple ipconfig objects '
155                                   'with method == dhcp')
156          dhcp_properties = ipconfig_properties
157        if dhcp_properties is None:
158            logging.info('Did not find IPConfig object with method == dhcp')
159            return None
160        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
161        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
162
163
164    def run_once(self):
165        self._server = None
166        self._server_ip = None
167        self._ethernet_pair = None
168        self._server = None
169        self._shill_proxy = shill_proxy.ShillProxy()
170        try:
171            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
172                    peer_interface_name='pseudoethernet0',
173                    peer_interface_ip=None)
174            self._ethernet_pair.setup()
175            if not self._ethernet_pair.is_healthy:
176                raise error.TestFail('Could not create virtual ethernet pair.')
177            self._server_ip = self._ethernet_pair.interface_ip
178            self._server = dhcp_test_server.DhcpTestServer(
179                    self._ethernet_pair.interface_name)
180            self._server.start()
181            if not self._server.is_healthy:
182                raise error.TestFail('Could not start DHCP test server.')
183            self._subnet_mask = self._ethernet_pair.interface_subnet_mask
184            self.test_body()
185        except (error.TestFail, error.TestNAError):
186            # Pass these through without modification.
187            raise
188        except Exception as e:
189            logging.error('Caught exception: %s.', str(e))
190            logging.error('Trace: %s', traceback.format_exc())
191            raise error.TestFail('Caught exception: %s.' % str(e))
192        finally:
193            if self._server is not None:
194                self._server.stop()
195            if self._ethernet_pair is not None:
196                self._ethernet_pair.teardown()
197
198    def test_body(self):
199        """
200        Override this method with the body of your test.  You may safely assume
201        that the the properties exposed by DhcpTestBase correctly return
202        references to the test apparatus.
203        """
204        raise error.TestFail('No test body implemented')
205
206    @property
207    def server_ip(self):
208        """
209        Return the IP address of the side of the interface that the DHCP test
210        server is bound to.  The server itself is bound the the broadcast
211        address on the interface.
212        """
213        return self._server_ip
214
215    @property
216    def server(self):
217        """
218        Returns a reference to the DHCP test server.  Use this to add handlers
219        and run tests.
220        """
221        return self._server
222
223    @property
224    def ethernet_pair(self):
225        """
226        Returns a reference to the virtual ethernet pair created to run DHCP
227        tests on.
228        """
229        return self._ethernet_pair
230
231    @property
232    def shill_proxy(self):
233        """
234        Returns a the shill proxy instance.
235        """
236        return self._shill_proxy
237
238    def negotiate_and_check_lease(self,
239                                  dhcp_options,
240                                  custom_fields={},
241                                  disable_check=False):
242        """
243        Perform DHCP lease negotiation, and ensure that the resulting
244        ipconfig matches the DHCP options provided to the server.
245
246        @param dhcp_options dict of properties the DHCP server should provide.
247        @param custom_fields dict of custom DHCP parameters to add to server.
248        @param disable_check bool whether to perform IPConfig parameter
249             checking.
250
251        """
252        if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options:
253            raise error.TestFail('You must specify OPTION_REQUESTED_IP to '
254                                 'negotiate a DHCP lease')
255        intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP]
256        # Build up the handling rules for the server and start the test.
257        rules = []
258        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
259                intended_ip,
260                self.server_ip,
261                dhcp_options,
262                custom_fields))
263        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
264                intended_ip,
265                self.server_ip,
266                dhcp_options,
267                custom_fields))
268        rules[-1].is_final_handler = True
269        self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS)
270        logging.info('Server is negotiating new lease with options: %s',
271                     dhcp_options)
272        self.server.wait_for_test_to_finish()
273        if not self.server.last_test_passed:
274            raise error.TestFail(
275                'Test failed: active rule is %s' % self.server.current_rule)
276
277        if disable_check:
278            logging.info('Skipping check of negotiated DHCP lease parameters.')
279        else:
280            self.wait_for_dhcp_propagation()
281            self.check_dhcp_config(dhcp_options)
282
283    def wait_for_dhcp_propagation(self):
284        """
285        Wait for configuration to propagate over dbus to shill.
286        TODO(wiley) Make this event based.  This is pretty sloppy.
287        """
288        time.sleep(0.1)
289
290    def check_dhcp_config(self, dhcp_options):
291        """
292        Compare the DHCP ipconfig with DHCP lease parameters to ensure
293        that the DUT attained the correct values.
294
295        @param dhcp_options dict of properties the DHCP server provided.
296
297        """
298        # The config is what the interface was actually configured with, as
299        # opposed to dhcp_options, which is what the server expected it be
300        # configured with.
301        for attempt in range(IPCONFIG_POLL_COUNT):
302            dhcp_config = self.get_interface_ipconfig(
303                    self.ethernet_pair.peer_interface_name)
304            if dhcp_config is not None:
305                break
306            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
307        else:
308            raise error.TestFail('Failed to retrieve DHCP ipconfig object '
309                                 'from shill.')
310
311        logging.debug('Got DHCP config: %s', str(dhcp_config))
312        expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP)
313        configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS)
314        if expected_address != configured_address:
315            raise error.TestFail('Interface configured with IP address not '
316                                 'granted by the DHCP server after DHCP '
317                                 'negotiation.  Expected %s but got %s.' %
318                                 (expected_address, configured_address))
319
320        # While DNS related settings only propagate to the system when the
321        # service is marked as the default service, we can still check the
322        # IP address on the interface, since that is set immediately.
323        interface_address = self.ethernet_pair.peer_interface_ip
324        if expected_address != interface_address:
325            raise error.TestFail('shill somehow knew about the proper DHCP '
326                                 'assigned address: %s, but configured the '
327                                 'interface with something completely '
328                                 'different: %s.' %
329                                 (expected_address, interface_address))
330
331        expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS)
332        configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS)
333        if (expected_dns_servers is not None and
334            expected_dns_servers != configured_dns_servers):
335            raise error.TestFail('Expected to be configured with DNS server '
336                                 'list %s, but was configured with %s '
337                                 'instead.' % (expected_dns_servers,
338                                               configured_dns_servers))
339
340        expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME)
341        configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME)
342        if (expected_domain_name is not None and
343            expected_domain_name != configured_domain_name):
344            raise error.TestFail('Expected to be configured with domain '
345                                 'name %s, but got %s instead.' %
346                                 (expected_domain_name, configured_domain_name))
347
348        expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME)
349        configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME)
350        if (expected_host_name is not None and
351            expected_host_name != configured_host_name):
352            raise error.TestFail('Expected to be configured with host '
353                                 'name %s, but got %s instead.' %
354                                 (expected_host_name, configured_host_name))
355
356        expected_search_list = dhcp_options.get(
357                dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST)
358        configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST)
359        if (expected_search_list is not None and
360            expected_search_list != configured_search_list):
361            raise error.TestFail('Expected to be configured with domain '
362                                 'search list %s, but got %s instead.' %
363                                 (expected_search_list, configured_search_list))
364
365        expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS)
366        if (not expected_routers and
367            dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)):
368            classless_static_routes = dhcp_options[
369                dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES]
370            for prefix, destination, gateway in classless_static_routes:
371                if not prefix:
372                    logging.info('Using %s as the default gateway', gateway)
373                    expected_routers = [ gateway ]
374                    break
375        configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY)
376        if expected_routers and expected_routers[0] != configured_router:
377            raise error.TestFail('Expected to be configured with gateway %s, '
378                                 'but got %s instead.' %
379                                 (expected_routers[0], configured_router))
380
381        self.server.wait_for_test_to_finish()
382        if not self.server.last_test_passed:
383            raise error.TestFail('Test server didn\'t get all the messages it '
384                                 'was told to expect for renewal.')
385