1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6Base class for DHCP tests.  This class just sets up a little bit of plumbing,
7like a virtual ethernet device with one end that looks like a real ethernet
8device to shill and a DHCP test server on the end that doesn't look like a real
9ethernet interface to shill.  Child classes should override test_body() with the
10logic of their test.  The plumbing of DhcpTestBase is accessible via properties.
11"""
12
13import logging
14import socket
15import struct
16import time
17import traceback
18
19from autotest_lib.client.bin import test
20from autotest_lib.client.common_lib import error
21from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
22from autotest_lib.client.cros import dhcp_handling_rule
23from autotest_lib.client.cros import dhcp_packet
24from autotest_lib.client.cros import dhcp_test_server
25from autotest_lib.client.cros.networking import shill_proxy
26
27
28# These are keys that may be used with the DBus dictionary returned from
29# DhcpTestBase.get_interface_ipconfig().
30DHCPCD_KEY_NAMESERVERS = 'NameServers'
31DHCPCD_KEY_GATEWAY = 'Gateway'
32DHCPCD_KEY_BROADCAST_ADDR = 'Broadcast'
33DHCPCD_KEY_ADDRESS = 'Address'
34DHCPCD_KEY_PREFIX_LENGTH = 'Prefixlen'
35DHCPCD_KEY_DOMAIN_NAME = 'DomainName'
36DHCPCD_KEY_ACCEPTED_HOSTNAME = 'AcceptedHostname'
37DHCPCD_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
38
39# We should be able to complete a DHCP negotiation in this amount of time.
40DHCP_NEGOTIATION_TIMEOUT_SECONDS = 10
41
42# After DHCP completes, an ipconfig should appear shortly after
43IPCONFIG_POLL_COUNT = 5
44IPCONFIG_POLL_PERIOD_SECONDS = 0.5
45
46class DhcpTestBase(test.test):
47    """Parent class for tests that work verify DHCP behavior."""
48    version = 1
49
50    @staticmethod
51    def rewrite_ip_suffix(subnet_mask, ip_in_subnet, ip_suffix):
52        """
53        Create a new IPv4 address in a subnet by bitwise and'ing an existing
54        address |ip_in_subnet| with |subnet_mask| and bitwise or'ing in
55        |ip_suffix|.  For safety, bitwise or the suffix with the complement of
56        the subnet mask.
57
58        Usage: rewrite_ip_suffix("255.255.255.0", "192.168.1.1", "0.0.0.105")
59
60        The example usage will return "192.168.1.105".
61
62        @param subnet_mask string subnet mask, e.g. "255.255.255.0"
63        @param ip_in_subnet string an IP address in the desired subnet
64        @param ip_suffix string suffix desired for new address, e.g. "0.0.0.105"
65
66        @return string IP address on in the same subnet with specified suffix.
67
68        """
69        mask = struct.unpack('!I', socket.inet_aton(subnet_mask))[0]
70        subnet = mask & struct.unpack('!I', socket.inet_aton(ip_in_subnet))[0]
71        suffix = ~mask & struct.unpack('!I', socket.inet_aton(ip_suffix))[0]
72        return socket.inet_ntoa(struct.pack('!I', (subnet | suffix)))
73
74
75    def get_device(self, interface_name):
76        """Finds the corresponding Device object for an interface with
77        the name |interface_name|.
78
79        @param interface_name string The name of the interface to check.
80
81        @return DBus interface object representing the associated device.
82
83        """
84        return self.shill_proxy.find_object('Device',
85                                            {'Name': interface_name})
86
87
88    def find_ethernet_service(self, interface_name):
89        """Finds the corresponding service object for an Ethernet interface.
90
91        @param interface_name string The name of the associated interface
92
93        @return Service object representing the associated service.
94
95        """
96        device = self.get_device(interface_name)
97        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
98        return self.shill_proxy.find_object('Service', {'Device': device_path})
99
100
101    def get_interface_ipconfig_objects(self, interface_name):
102        """
103        Returns a list of dbus object proxies for |interface_name|.
104        Returns an empty list if no such interface exists.
105
106        @param interface_name string name of the device to query (e.g., "eth0").
107
108        @return list of objects representing DBus IPConfig RPC endpoints.
109
110        """
111        device = self.get_device(interface_name)
112        if device is None:
113            return []
114
115        device_properties = device.GetProperties(utf8_strings=True)
116        proxy = self.shill_proxy
117
118        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
119        return filter(bool,
120                      [ proxy.get_dbus_object(ipconfig_object, property_path)
121                        for property_path in device_properties['IPConfigs'] ])
122
123
124    def get_interface_ipconfig(self, interface_name):
125        """
126        Returns a dictionary containing settings for an |interface_name| set
127        via DHCP.  Returns None if no such interface or setting bundle on
128        that interface can be found in shill.
129
130        @param interface_name string name of the device to query (e.g., "eth0").
131
132        @return dict containing the the properties of the IPConfig stripped
133            of DBus meta-data or None.
134
135        """
136        dhcp_properties = None
137        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
138          logging.info('Looking at ipconfig %r', ipconfig)
139          ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
140          if 'Method' not in ipconfig_properties:
141              logging.info('Found ipconfig object with no method field')
142              continue
143          if ipconfig_properties['Method'] != 'dhcp':
144              logging.info('Found ipconfig object with method != dhcp')
145              continue
146          if dhcp_properties != None:
147              raise error.TestFail('Found multiple ipconfig objects '
148                                   'with method == dhcp')
149          dhcp_properties = ipconfig_properties
150        if dhcp_properties is None:
151            logging.info('Did not find IPConfig object with method == dhcp')
152            return None
153        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
154        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
155
156
157    def run_once(self):
158        self._server = None
159        self._server_ip = None
160        self._ethernet_pair = None
161        self._server = None
162        self._shill_proxy = shill_proxy.ShillProxy()
163        try:
164            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
165                    peer_interface_name='pseudoethernet0',
166                    peer_interface_ip=None)
167            self._ethernet_pair.setup()
168            if not self._ethernet_pair.is_healthy:
169                raise error.TestFail('Could not create virtual ethernet pair.')
170            self._server_ip = self._ethernet_pair.interface_ip
171            self._server = dhcp_test_server.DhcpTestServer(
172                    self._ethernet_pair.interface_name)
173            self._server.start()
174            if not self._server.is_healthy:
175                raise error.TestFail('Could not start DHCP test server.')
176            self._subnet_mask = self._ethernet_pair.interface_subnet_mask
177            self.test_body()
178        except (error.TestFail, error.TestNAError):
179            # Pass these through without modification.
180            raise
181        except Exception as e:
182            logging.error('Caught exception: %s.', str(e))
183            logging.error('Trace: %s', traceback.format_exc())
184            raise error.TestFail('Caught exception: %s.' % str(e))
185        finally:
186            if self._server is not None:
187                self._server.stop()
188            if self._ethernet_pair is not None:
189                self._ethernet_pair.teardown()
190
191    def test_body(self):
192        """
193        Override this method with the body of your test.  You may safely assume
194        that the the properties exposed by DhcpTestBase correctly return
195        references to the test apparatus.
196        """
197        raise error.TestFail('No test body implemented')
198
199    @property
200    def server_ip(self):
201        """
202        Return the IP address of the side of the interface that the DHCP test
203        server is bound to.  The server itself is bound the the broadcast
204        address on the interface.
205        """
206        return self._server_ip
207
208    @property
209    def server(self):
210        """
211        Returns a reference to the DHCP test server.  Use this to add handlers
212        and run tests.
213        """
214        return self._server
215
216    @property
217    def ethernet_pair(self):
218        """
219        Returns a reference to the virtual ethernet pair created to run DHCP
220        tests on.
221        """
222        return self._ethernet_pair
223
224    @property
225    def shill_proxy(self):
226        """
227        Returns a the shill proxy instance.
228        """
229        return self._shill_proxy
230
231    def negotiate_and_check_lease(self,
232                                  dhcp_options,
233                                  custom_fields={},
234                                  disable_check=False):
235        """
236        Perform DHCP lease negotiation, and ensure that the resulting
237        ipconfig matches the DHCP options provided to the server.
238
239        @param dhcp_options dict of properties the DHCP server should provide.
240        @param custom_fields dict of custom DHCP parameters to add to server.
241        @param disable_check bool whether to perform IPConfig parameter
242             checking.
243
244        """
245        if dhcp_packet.OPTION_REQUESTED_IP not in dhcp_options:
246            raise error.TestFail('You must specify OPTION_REQUESTED_IP to '
247                                 'negotiate a DHCP lease')
248        intended_ip = dhcp_options[dhcp_packet.OPTION_REQUESTED_IP]
249        # Build up the handling rules for the server and start the test.
250        rules = []
251        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToDiscovery(
252                intended_ip,
253                self.server_ip,
254                dhcp_options,
255                custom_fields))
256        rules.append(dhcp_handling_rule.DhcpHandlingRule_RespondToRequest(
257                intended_ip,
258                self.server_ip,
259                dhcp_options,
260                custom_fields))
261        rules[-1].is_final_handler = True
262        self.server.start_test(rules, DHCP_NEGOTIATION_TIMEOUT_SECONDS)
263        logging.info('Server is negotiating new lease with options: %s',
264                     dhcp_options)
265        self.server.wait_for_test_to_finish()
266        if not self.server.last_test_passed:
267            raise error.TestFail(
268                'Test failed: active rule is %s' % self.server.current_rule)
269
270        if disable_check:
271            logging.info('Skipping check of negotiated DHCP lease parameters.')
272        else:
273            self.wait_for_dhcp_propagation()
274            self.check_dhcp_config(dhcp_options)
275
276    def wait_for_dhcp_propagation(self):
277        """
278        Wait for configuration to propagate over dbus to shill.
279        TODO(wiley) Make this event based.  This is pretty sloppy.
280        """
281        time.sleep(0.1)
282
283    def check_dhcp_config(self, dhcp_options):
284        """
285        Compare the DHCP ipconfig with DHCP lease parameters to ensure
286        that the DUT attained the correct values.
287
288        @param dhcp_options dict of properties the DHCP server provided.
289
290        """
291        # The config is what the interface was actually configured with, as
292        # opposed to dhcp_options, which is what the server expected it be
293        # configured with.
294        for attempt in range(IPCONFIG_POLL_COUNT):
295            dhcp_config = self.get_interface_ipconfig(
296                    self.ethernet_pair.peer_interface_name)
297            if dhcp_config is not None:
298                break
299            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
300        else:
301            raise error.TestFail('Failed to retrieve DHCP ipconfig object '
302                                 'from shill.')
303
304        logging.debug('Got DHCP config: %s', str(dhcp_config))
305        expected_address = dhcp_options.get(dhcp_packet.OPTION_REQUESTED_IP)
306        configured_address = dhcp_config.get(DHCPCD_KEY_ADDRESS)
307        if expected_address != configured_address:
308            raise error.TestFail('Interface configured with IP address not '
309                                 'granted by the DHCP server after DHCP '
310                                 'negotiation.  Expected %s but got %s.' %
311                                 (expected_address, configured_address))
312
313        # While DNS related settings only propagate to the system when the
314        # service is marked as the default service, we can still check the
315        # IP address on the interface, since that is set immediately.
316        interface_address = self.ethernet_pair.peer_interface_ip
317        if expected_address != interface_address:
318            raise error.TestFail('shill somehow knew about the proper DHCP '
319                                 'assigned address: %s, but configured the '
320                                 'interface with something completely '
321                                 'different: %s.' %
322                                 (expected_address, interface_address))
323
324        expected_dns_servers = dhcp_options.get(dhcp_packet.OPTION_DNS_SERVERS)
325        configured_dns_servers = dhcp_config.get(DHCPCD_KEY_NAMESERVERS)
326        if (expected_dns_servers is not None and
327            expected_dns_servers != configured_dns_servers):
328            raise error.TestFail('Expected to be configured with DNS server '
329                                 'list %s, but was configured with %s '
330                                 'instead.' % (expected_dns_servers,
331                                               configured_dns_servers))
332
333        expected_domain_name = dhcp_options.get(dhcp_packet.OPTION_DOMAIN_NAME)
334        configured_domain_name = dhcp_config.get(DHCPCD_KEY_DOMAIN_NAME)
335        if (expected_domain_name is not None and
336            expected_domain_name != configured_domain_name):
337            raise error.TestFail('Expected to be configured with domain '
338                                 'name %s, but got %s instead.' %
339                                 (expected_domain_name, configured_domain_name))
340
341        expected_host_name = dhcp_options.get(dhcp_packet.OPTION_HOST_NAME)
342        configured_host_name = dhcp_config.get(DHCPCD_KEY_ACCEPTED_HOSTNAME)
343        if (expected_host_name is not None and
344            expected_host_name != configured_host_name):
345            raise error.TestFail('Expected to be configured with host '
346                                 'name %s, but got %s instead.' %
347                                 (expected_host_name, configured_host_name))
348
349        expected_search_list = dhcp_options.get(
350                dhcp_packet.OPTION_DNS_DOMAIN_SEARCH_LIST)
351        configured_search_list = dhcp_config.get(DHCPCD_KEY_SEARCH_DOMAIN_LIST)
352        if (expected_search_list is not None and
353            expected_search_list != configured_search_list):
354            raise error.TestFail('Expected to be configured with domain '
355                                 'search list %s, but got %s instead.' %
356                                 (expected_search_list, configured_search_list))
357
358        expected_routers = dhcp_options.get(dhcp_packet.OPTION_ROUTERS)
359        if (not expected_routers and
360            dhcp_options.get(dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES)):
361            classless_static_routes = dhcp_options[
362                dhcp_packet.OPTION_CLASSLESS_STATIC_ROUTES]
363            for prefix, destination, gateway in classless_static_routes:
364                if not prefix:
365                    logging.info('Using %s as the default gateway', gateway)
366                    expected_routers = [ gateway ]
367                    break
368        configured_router = dhcp_config.get(DHCPCD_KEY_GATEWAY)
369        if expected_routers and expected_routers[0] != configured_router:
370            raise error.TestFail('Expected to be configured with gateway %s, '
371                                 'but got %s instead.' %
372                                 (expected_routers[0], configured_router))
373
374        self.server.wait_for_test_to_finish()
375        if not self.server.last_test_passed:
376            raise error.TestFail('Test server didn\'t get all the messages it '
377                                 'was told to expect for renewal.')
378