1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""
6Base class for DHCPv6 tests.  This class just sets up a little bit of plumbing,
7like a virtual ethernet device with one end that looks like a real ethernet
8device to shill and a DHCPv6 test server on the end that doesn't look like a
9real ethernet interface to shill.  Child classes should override test_body()
10with the logic of their test.
11"""
12
13import logging
14import time
15import traceback
16
17from autotest_lib.client.bin import test
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
20from autotest_lib.client.cros import dhcpv6_test_server
21from autotest_lib.client.cros.networking import shill_proxy
22
23# These are keys that may be used with the DBus dictionary returned from
24# Dhcpv6TestBase.get_interface_ipconfig().
25DHCPV6_KEY_ADDRESS = 'Address'
26DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix'
27DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength'
28DHCPV6_KEY_NAMESERVERS = 'NameServers'
29DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
30
31# After DHCPv6 completes, an ipconfig should appear shortly after
32IPCONFIG_POLL_COUNT = 5
33IPCONFIG_POLL_PERIOD_SECONDS = 1
34
35class Dhcpv6TestBase(test.test):
36    """Parent class for tests that work verify DHCPv6 behavior."""
37    version = 1
38
39    def get_device(self, interface_name):
40        """Finds the corresponding Device object for an interface with
41        the name |interface_name|.
42
43        @param interface_name string The name of the interface to check.
44
45        @return DBus interface object representing the associated device.
46
47        """
48        return self.shill_proxy.find_object('Device',
49                                            {'Name': interface_name})
50
51
52    def find_ethernet_service(self, interface_name):
53        """Finds the corresponding service object for an Ethernet interface.
54
55        @param interface_name string The name of the associated interface
56
57        @return Service object representing the associated service.
58
59        """
60        device = self.get_device(interface_name)
61        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
62        return self.shill_proxy.find_object('Service', {'Device': device_path})
63
64
65    def get_interface_ipconfig_objects(self, interface_name):
66        """
67        Returns a list of dbus object proxies for |interface_name|.
68        Returns an empty list if no such interface exists.
69
70        @param interface_name string name of the device to query (e.g., "eth0").
71
72        @return list of objects representing DBus IPConfig RPC endpoints.
73
74        """
75        device = self.get_device(interface_name)
76        if device is None:
77            return []
78
79        device_properties = device.GetProperties(utf8_strings=True)
80        proxy = self.shill_proxy
81
82        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
83        return filter(bool,
84                      [ proxy.get_dbus_object(ipconfig_object, property_path)
85                        for property_path in device_properties['IPConfigs'] ])
86
87
88    def get_interface_ipconfig(self, interface_name):
89        """
90        Returns a dictionary containing settings for an |interface_name| set
91        via DHCPv6.  Returns None if no such interface or setting bundle on
92        that interface can be found in shill.
93
94        @param interface_name string name of the device to query (e.g., "eth0").
95
96        @return dict containing the the properties of the IPConfig stripped
97            of DBus meta-data or None.
98
99        """
100        dhcp_properties = None
101        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
102          logging.info('Looking at ipconfig %r', ipconfig)
103          ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
104          if 'Method' not in ipconfig_properties:
105              logging.info('Found ipconfig object with no method field')
106              continue
107          if ipconfig_properties['Method'] != 'dhcp6':
108              logging.info('Found ipconfig object with method != dhcp6')
109              continue
110          if dhcp_properties != None:
111              raise error.TestFail('Found multiple ipconfig objects '
112                                   'with method == dhcp6')
113          dhcp_properties = ipconfig_properties
114        if dhcp_properties is None:
115            logging.info('Did not find IPConfig object with method == dhcp6')
116            return None
117        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
118        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
119
120
121    def run_once(self):
122        self._server = None
123        self._server_ip = None
124        self._ethernet_pair = None
125        self._shill_proxy = shill_proxy.ShillProxy()
126        try:
127            # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting
128            # shill with appropriate command line options or via a new DBUS
129            # command.
130            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
131                    interface_ip=None,
132                    peer_interface_name='pseudoethernet0',
133                    peer_interface_ip=None,
134                    interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS)
135            self._ethernet_pair.setup()
136            if not self._ethernet_pair.is_healthy:
137                raise error.TestFail('Could not create virtual ethernet pair.')
138            self._server_ip = self._ethernet_pair.interface_ip
139            self._server = dhcpv6_test_server.Dhcpv6TestServer(
140                    self._ethernet_pair.interface_name)
141            self._server.start()
142            self.test_body()
143        except (error.TestFail, error.TestNAError):
144            # Pass these through without modification.
145            raise
146        except Exception as e:
147            logging.error('Caught exception: %s.', str(e))
148            logging.error('Trace: %s', traceback.format_exc())
149            raise error.TestFail('Caught exception: %s.' % str(e))
150        finally:
151            if self._server is not None:
152                self._server.stop()
153            if self._ethernet_pair is not None:
154                self._ethernet_pair.teardown()
155
156    def test_body(self):
157        """
158        Override this method with the body of your test.  You may safely assume
159        that the the properties exposed by DhcpTestBase correctly return
160        references to the test apparatus.
161        """
162        raise error.TestFail('No test body implemented')
163
164    @property
165    def server_ip(self):
166        """
167        Return the IP address of the side of the interface that the DHCPv6 test
168        server is bound to.  The server itself is bound the the broadcast
169        address on the interface.
170        """
171        return self._server_ip
172
173    @property
174    def server(self):
175        """
176        Returns a reference to the DHCP test server.  Use this to add handlers
177        and run tests.
178        """
179        return self._server
180
181    @property
182    def ethernet_pair(self):
183        """
184        Returns a reference to the virtual ethernet pair created to run DHCP
185        tests on.
186        """
187        return self._ethernet_pair
188
189    @property
190    def shill_proxy(self):
191        """
192        Returns a the shill proxy instance.
193        """
194        return self._shill_proxy
195
196
197    def check_dhcpv6_config(self):
198        """
199        Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure
200        that the DUT attained the correct values.
201
202        """
203        # Retrieve DHCPv6 configuration.
204        for attempt in range(IPCONFIG_POLL_COUNT):
205            dhcpv6_config = self.get_interface_ipconfig(
206                    self.ethernet_pair.peer_interface_name)
207            # Wait until both IP address and delegated prefix are obtained.
208            if (dhcpv6_config is not None and
209                dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and
210                dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)):
211                break;
212            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
213        else:
214            raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object '
215                                 'from shill.')
216
217        # Verify Non-temporary Address prefix.
218        address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS)
219        actual_prefix = address[:address.index('::')]
220        expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[:
221                dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')]
222        if actual_prefix != expected_prefix:
223            raise error.TestFail('Address prefix mismatch: '
224                                 'actual %s expected %s.' %
225                                 (actual_prefix, expected_prefix))
226        # Verify Non-temporary Address suffix.
227        actual_suffix = int(address[address.index('::')+2:], 16)
228        if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or
229            actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH):
230            raise error.TestFail('Invalid address suffix: '
231                                 'actual %x expected (%x-%x)' %
232                                 (actual_suffix,
233                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW,
234                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH))
235
236        # Verify delegated prefix.
237        delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)
238        for x in range(
239                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW,
240                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1):
241            valid_prefix = \
242                    dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x
243            if delegated_prefix == valid_prefix:
244                break;
245        else:
246            raise error.TestFail('Invalid delegated prefix: %s' %
247                                 (delegated_prefix))
248        # Verify delegated prefix length.
249        delegated_prefix_length = \
250                int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH))
251        expected_delegated_prefix_length = \
252                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH
253        if delegated_prefix_length != expected_delegated_prefix_length:
254            raise error.TestFail('Delegated prefix length mismatch: '
255                                 'actual %d expected %d' %
256                                 (delegated_prefix_length,
257                                  expected_delegated_prefix_length))
258
259        # Verify name servers.
260        actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS)
261        expected_name_servers = \
262                dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',')
263        if actual_name_servers != expected_name_servers:
264            raise error.TestFail('Name servers mismatch: actual %r expected %r'
265                                 % (actual_name_servers, expected_name_servers))
266        # Verify domain search.
267        actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST)
268        expected_domain_search = \
269                dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',')
270        if actual_domain_search != expected_domain_search:
271            raise error.TestFail('Domain search list mismatch: '
272                                 'actual %r expected %r' %
273                                 (actual_domain_search, expected_domain_search))
274