1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""
7Base class for DHCPv6 tests.  This class just sets up a little bit of plumbing,
8like a virtual ethernet device with one end that looks like a real ethernet
9device to shill and a DHCPv6 test server on the end that doesn't look like a
10real ethernet interface to shill.  Child classes should override test_body()
11with the logic of their test.
12"""
13
14from __future__ import absolute_import
15from __future__ import division
16from __future__ import print_function
17
18import logging
19from six.moves import filter
20from six.moves import range
21import time
22import traceback
23
24from autotest_lib.client.bin import test
25from autotest_lib.client.common_lib import error
26from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
27from autotest_lib.client.cros import dhcpv6_test_server
28from autotest_lib.client.cros.networking import shill_proxy
29
30# These are keys that may be used with the DBus dictionary returned from
31# Dhcpv6TestBase.get_interface_ipconfig().
32DHCPV6_KEY_ADDRESS = 'Address'
33DHCPV6_KEY_DELEGATED_PREFIX = 'DelegatedPrefix'
34DHCPV6_KEY_DELEGATED_PREFIX_LENGTH = 'DelegatedPrefixLength'
35DHCPV6_KEY_NAMESERVERS = 'NameServers'
36DHCPV6_KEY_SEARCH_DOMAIN_LIST = 'SearchDomains'
37
38# After DHCPv6 completes, an ipconfig should appear shortly after
39IPCONFIG_POLL_COUNT = 5
40IPCONFIG_POLL_PERIOD_SECONDS = 1
41
42class Dhcpv6TestBase(test.test):
43    """Parent class for tests that work verify DHCPv6 behavior."""
44    version = 1
45
46    def get_device(self, interface_name):
47        """Finds the corresponding Device object for an interface with
48        the name |interface_name|.
49
50        @param interface_name string The name of the interface to check.
51
52        @return DBus interface object representing the associated device.
53
54        """
55        return self.shill_proxy.find_object('Device',
56                                            {'Name': interface_name})
57
58
59    def find_ethernet_service(self, interface_name):
60        """Finds the corresponding service object for an Ethernet interface.
61
62        @param interface_name string The name of the associated interface
63
64        @return Service object representing the associated service.
65
66        """
67        device = self.get_device(interface_name)
68        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
69        return self.shill_proxy.find_object('Service', {'Device': device_path})
70
71
72    def get_interface_ipconfig_objects(self, interface_name):
73        """
74        Returns a list of dbus object proxies for |interface_name|.
75        Returns an empty list if no such interface exists.
76
77        @param interface_name string name of the device to query (e.g., "eth0").
78
79        @return list of objects representing DBus IPConfig RPC endpoints.
80
81        """
82        device = self.get_device(interface_name)
83        if device is None:
84            return []
85
86        device_properties = device.GetProperties(utf8_strings=True)
87        proxy = self.shill_proxy
88
89        ipconfig_object = proxy.DBUS_TYPE_IPCONFIG
90        return list(filter(bool,
91                      [ proxy.get_dbus_object(ipconfig_object, property_path)
92                        for property_path in device_properties['IPConfigs'] ]))
93
94
95    def get_interface_ipconfig(self, interface_name):
96        """
97        Returns a dictionary containing settings for an |interface_name| set
98        via DHCPv6.  Returns None if no such interface or setting bundle on
99        that interface can be found in shill.
100
101        @param interface_name string name of the device to query (e.g., "eth0").
102
103        @return dict containing the the properties of the IPConfig stripped
104            of DBus meta-data or None.
105
106        """
107        dhcp_properties = None
108        for ipconfig in self.get_interface_ipconfig_objects(interface_name):
109          logging.info('Looking at ipconfig %r', ipconfig)
110          ipconfig_properties = ipconfig.GetProperties(utf8_strings=True)
111          if 'Method' not in ipconfig_properties:
112              logging.info('Found ipconfig object with no method field')
113              continue
114          if ipconfig_properties['Method'] != 'dhcp6':
115              logging.info('Found ipconfig object with method != dhcp6')
116              continue
117          if dhcp_properties != None:
118              raise error.TestFail('Found multiple ipconfig objects '
119                                   'with method == dhcp6')
120          dhcp_properties = ipconfig_properties
121        if dhcp_properties is None:
122            logging.info('Did not find IPConfig object with method == dhcp6')
123            return None
124        logging.info('Got raw dhcp config dbus object: %s.', dhcp_properties)
125        return shill_proxy.ShillProxy.dbus2primitive(dhcp_properties)
126
127
128    def run_once(self):
129        self._server = None
130        self._server_ip = None
131        self._ethernet_pair = None
132        self._shill_proxy = shill_proxy.ShillProxy()
133        try:
134            # TODO(zqiu): enable DHCPv6 for peer interface, either by restarting
135            # shill with appropriate command line options or via a new DBUS
136            # command.
137            self._ethernet_pair = virtual_ethernet_pair.VirtualEthernetPair(
138                    interface_ip=None,
139                    peer_interface_name='pseudoethernet0',
140                    peer_interface_ip=None,
141                    interface_ipv6=dhcpv6_test_server.DHCPV6_SERVER_ADDRESS)
142            self._ethernet_pair.setup()
143            if not self._ethernet_pair.is_healthy:
144                raise error.TestFail('Could not create virtual ethernet pair.')
145            self._server_ip = self._ethernet_pair.interface_ip
146            self._server = dhcpv6_test_server.Dhcpv6TestServer(
147                    self._ethernet_pair.interface_name)
148            self._server.start()
149            self.test_body()
150        except (error.TestFail, error.TestNAError):
151            # Pass these through without modification.
152            raise
153        except Exception as e:
154            logging.error('Caught exception: %s.', str(e))
155            logging.error('Trace: %s', traceback.format_exc())
156            raise error.TestFail('Caught exception: %s.' % str(e))
157        finally:
158            if self._server is not None:
159                self._server.stop()
160            if self._ethernet_pair is not None:
161                self._ethernet_pair.teardown()
162
163    def test_body(self):
164        """
165        Override this method with the body of your test.  You may safely assume
166        that the the properties exposed by DhcpTestBase correctly return
167        references to the test apparatus.
168        """
169        raise error.TestFail('No test body implemented')
170
171    @property
172    def server_ip(self):
173        """
174        Return the IP address of the side of the interface that the DHCPv6 test
175        server is bound to.  The server itself is bound the the broadcast
176        address on the interface.
177        """
178        return self._server_ip
179
180    @property
181    def server(self):
182        """
183        Returns a reference to the DHCP test server.  Use this to add handlers
184        and run tests.
185        """
186        return self._server
187
188    @property
189    def ethernet_pair(self):
190        """
191        Returns a reference to the virtual ethernet pair created to run DHCP
192        tests on.
193        """
194        return self._ethernet_pair
195
196    @property
197    def shill_proxy(self):
198        """
199        Returns a the shill proxy instance.
200        """
201        return self._shill_proxy
202
203
204    def check_dhcpv6_config(self):
205        """
206        Compare the DHCPv6 ipconfig with DHCP lease parameters to ensure
207        that the DUT attained the correct values.
208
209        """
210        # Retrieve DHCPv6 configuration.
211        for attempt in range(IPCONFIG_POLL_COUNT):
212            dhcpv6_config = self.get_interface_ipconfig(
213                    self.ethernet_pair.peer_interface_name)
214            # Wait until both IP address and delegated prefix are obtained.
215            if (dhcpv6_config is not None and
216                dhcpv6_config.get(DHCPV6_KEY_ADDRESS) and
217                dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)):
218                break;
219            time.sleep(IPCONFIG_POLL_PERIOD_SECONDS)
220        else:
221            raise error.TestFail('Failed to retrieve DHCPv6 ipconfig object '
222                                 'from shill.')
223
224        # Verify Non-temporary Address prefix.
225        address = dhcpv6_config.get(DHCPV6_KEY_ADDRESS)
226        actual_prefix = address[:address.index('::')]
227        expected_prefix = dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX[:
228                dhcpv6_test_server.DHCPV6_SERVER_SUBNET_PREFIX.index('::')]
229        if actual_prefix != expected_prefix:
230            raise error.TestFail('Address prefix mismatch: '
231                                 'actual %s expected %s.' %
232                                 (actual_prefix, expected_prefix))
233        # Verify Non-temporary Address suffix.
234        actual_suffix = int(address[address.index('::')+2:], 16)
235        if (actual_suffix < dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW or
236            actual_suffix > dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH):
237            raise error.TestFail('Invalid address suffix: '
238                                 'actual %x expected (%x-%x)' %
239                                 (actual_suffix,
240                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_LOW,
241                                  dhcpv6_test_server.DHCPV6_ADDRESS_RANGE_HIGH))
242
243        # Verify delegated prefix.
244        delegated_prefix = dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX)
245        for x in range(
246                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_LOW,
247                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_INDEX_HIGH+1):
248            valid_prefix = \
249                    dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_RANGE_FORMAT % x
250            if delegated_prefix == valid_prefix:
251                break;
252        else:
253            raise error.TestFail('Invalid delegated prefix: %s' %
254                                 (delegated_prefix))
255        # Verify delegated prefix length.
256        delegated_prefix_length = \
257                int(dhcpv6_config.get(DHCPV6_KEY_DELEGATED_PREFIX_LENGTH))
258        expected_delegated_prefix_length = \
259                dhcpv6_test_server.DHCPV6_PREFIX_DELEGATION_PREFIX_LENGTH
260        if delegated_prefix_length != expected_delegated_prefix_length:
261            raise error.TestFail('Delegated prefix length mismatch: '
262                                 'actual %d expected %d' %
263                                 (delegated_prefix_length,
264                                  expected_delegated_prefix_length))
265
266        # Verify name servers.
267        actual_name_servers = dhcpv6_config.get(DHCPV6_KEY_NAMESERVERS)
268        expected_name_servers = \
269                dhcpv6_test_server.DHCPV6_NAME_SERVERS.split(',')
270        if actual_name_servers != expected_name_servers:
271            raise error.TestFail('Name servers mismatch: actual %r expected %r'
272                                 % (actual_name_servers, expected_name_servers))
273        # Verify domain search.
274        actual_domain_search = dhcpv6_config.get(DHCPV6_KEY_SEARCH_DOMAIN_LIST)
275        expected_domain_search = \
276                dhcpv6_test_server.DHCPV6_DOMAIN_SEARCH.split(',')
277        if actual_domain_search != expected_domain_search:
278            raise error.TestFail('Domain search list mismatch: '
279                                 'actual %r expected %r' %
280                                 (actual_domain_search, expected_domain_search))
281