1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.common_lib import utils
10from autotest_lib.client.cros import dhcp_test_base
11from autotest_lib.client.cros import radvd_server
12from autotest_lib.client.cros.networking import shill_proxy
13
14class network_Ipv6SimpleNegotiation(dhcp_test_base.DhcpTestBase):
15    """
16    The test subclass that implements IPv6 negotiation.  This test
17    starts an IPv6 router, then performs a series of tests on the
18    IPv6 addresses and IPv6 DNS addresses that the DUT should have
19    gained.
20    """
21
22    def _get_ip6_addresses(self):
23        """Gets the list of client IPv6 addresses.
24
25        Retrieve IPv6 addresses associated with the "client side" of the
26        pseudo-interface pair.  Returns a dict keyed by the IPv6 address,
27        with the values being the array of attribute strings that follow
28        int the "ip addr show" output.  For example, a line containing:
29
30            inet6 fe80::ae16:2dff:fe01:0203/64 scope link
31
32        will turn into a dict key:
33
34            'fe80::ae16:2dff:fe01:0203/64': [ 'scope', 'link' ]
35
36        """
37        addr_output = utils.system_output(
38            "ip -6 addr show dev %s" % self.ethernet_pair.peer_interface_name)
39        addresses = {}
40        for line in addr_output.splitlines():
41            parts = line.lstrip().split()
42            if parts[0] != 'inet6' or 'deprecated' in parts:
43                continue
44            addresses[parts[1]] = parts[2:]
45        return addresses
46
47
48    def _get_link_address(self):
49        """Get the client MAC address.
50
51        Retrieve the MAC address associated with the "client side" of the
52        pseudo-interface pair.  For example, the "ip link show" output:
53
54            link/ether 01:02:03:04:05:05 brd ff:ff:ff:ff:ff:ff
55
56        will cause a return of "01:02:03:04:05:05"
57
58        """
59        addr_output = utils.system_output(
60            'ip link show %s' % self.ethernet_pair.peer_interface_name)
61        for line in addr_output.splitlines():
62            parts = line.lstrip().split(' ')
63            if parts[0] == 'link/ether':
64                return parts[1]
65
66
67    def _get_ipconfig_properties(self):
68        for ipconfig in self.get_interface_ipconfig_objects(
69                self.ethernet_pair.peer_interface_name):
70            ipconfig_properties = shill_proxy.ShillProxy.dbus2primitive(
71                    ipconfig.GetProperties(utf8_strings=True))
72            if 'Method' not in ipconfig_properties:
73                continue
74
75            if ipconfig_properties['Method'] != 'ipv6':
76                continue
77
78            return ipconfig_properties
79        else:
80            raise error.TestError('Found no IPv6 IPConfig entries')
81
82
83    def verify_ipv6_addresses(self):
84        """Verify IPv6 configuration.
85
86        Perform various tests to validate the IPv6 addresses acquired by
87        the client.
88
89        """
90        addresses = self._get_ip6_addresses()
91        logging.info('Got addresses %r', addresses)
92        global_addresses = [key for key in addresses
93                            if 'global' in addresses[key]]
94
95        if len(global_addresses) != 2:
96            raise error.TestError('Expected 2 global address but got %d' %
97                                  len(global_addresses))
98
99        prefix = radvd_server.RADVD_DEFAULT_PREFIX
100        prefix = prefix[:prefix.index('::')]
101        for address in global_addresses:
102            if not address.startswith(prefix):
103                raise error.TestError('Global address %s does not start with '
104                                      'expected prefix %s' %
105                                      address, prefix)
106
107        # One globally scoped address should be based on the last 3 octets
108        # of the MAC adddress, while the other should not.  For example,
109        # for MAC address "01:02:03:04:05:06", we should see an address
110        # that ends with "4:506/64" (the "/64" is the default radvd suffix).
111        link_parts = [int(b, 16) for b in self._get_link_address().split(':')]
112        address_suffix = '%x:%x%s' % (link_parts[3],
113                                      (link_parts[4] << 8) | link_parts[5],
114                                      radvd_server.RADVD_DEFAULT_SUFFIX)
115        mac_related_addresses = [addr for addr in global_addresses
116                                 if addr.endswith(address_suffix)]
117        if len(mac_related_addresses) != 1:
118            raise error.TestError('Expected 1 mac-related global address but '
119                                  'got %d' % len(mac_related_addresses))
120        mac_related_address = mac_related_addresses[0]
121
122        local_address_count = len(addresses) - len(global_addresses)
123        if local_address_count <= 0:
124            raise error.TestError('Expected at least 1 non-global address but '
125                                  'got %d' % local_address_count)
126
127        temporary_address = [addr for addr in global_addresses
128                                 if addr != mac_related_address][0]
129        self.verify_ipconfig_contains(temporary_address)
130
131
132    def verify_ipconfig_contains(self, address_and_prefix):
133        """Verify that shill has an IPConfig entry with the specified address.
134
135        @param address_and_prefix string with address/prefix to search for.
136
137        """
138        address, prefix_str = address_and_prefix.split('/')
139        prefix = int(prefix_str)
140        ipconfig_properties = self._get_ipconfig_properties()
141
142        for property, value in (('Address', address), ('Prefixlen', prefix)):
143            if property not in ipconfig_properties:
144               raise error.TestError('IPv6 IPConfig entry does not '
145                                     'contain property %s' % property)
146            if ipconfig_properties[property] != value:
147               raise error.TestError('IPv6 IPConfig property %s does not '
148                                     'contain the expected value %s; '
149                                     'instead it is %s' %
150                                     (property, value,
151                                      ipconfig_properties[property]))
152
153
154    def verify_ipconfig_name_servers(self, name_servers):
155        """Verify that shill has an IPConfig entry with the specified name
156        servers.
157
158        @param name_servers list of expected name servers.
159
160        """
161        ipconfig_properties = self._get_ipconfig_properties()
162
163        if ipconfig_properties['NameServers'] != name_servers:
164            raise error.TestError('IPv6 name servers mismatched: '
165                                  'expected %r actual %r' %
166                                  name_servers,
167                                  ipconfig_properties['NameServers'])
168
169
170    def test_body(self):
171        """The main body for this test."""
172        server = radvd_server.RadvdServer(self.ethernet_pair.interface_name)
173        server.start_server()
174
175        try:
176            # Wait for IPv6 negotiation to complete.
177            time.sleep(radvd_server.RADVD_DEFAULT_MAX_ADV_INTERVAL)
178
179            # In this time, we should have also acquired an IPv6 address.
180            self.verify_ipv6_addresses()
181            self.verify_ipconfig_name_servers(
182                    radvd_server.RADVD_DEFAULT_RDNSS_SERVERS.split(' '))
183        finally:
184            server.stop_server()
185