1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7
8from autotest_lib.client.bin import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros import site_eap_certs
11from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
12from autotest_lib.client.cros import hostapd_server
13from autotest_lib.client.cros import shill_temporary_profile
14from autotest_lib.client.cros.networking import shill_proxy
15
16class network_8021xWiredAuthentication(test.test):
17    """The 802.1x EAP wired authentication class.
18
19    Runs hostapd on one side of an ethernet pair, and shill on the other.
20    Configures the Ethernet service with 802.1x credentials and ensures
21    that when shill detects an EAP authenticator, it is successful in
22    using its credentials to gain access.
23
24    """
25    INTERFACE_NAME = 'pseudoethernet0'
26    AUTHENTICATION_FLAG = 'EapAuthenticationCompleted'
27    TEST_PROFILE_NAME = 'test1x'
28    AUTHENTICATION_TIMEOUT = 10
29    version = 1
30
31    def get_device(self, interface_name):
32        """Finds the corresponding Device object for an ethernet
33        interface with the name |interface_name|.
34
35        @param interface_name string The name of the interface to check.
36
37        @return DBus interface object representing the associated device.
38
39        """
40        device = self._shill_proxy.find_object('Device',
41                                               {'Name': interface_name})
42        if device is None:
43            raise error.TestFail('Device was not found.')
44
45        return device
46
47
48    def get_authenticated_flag(self, interface_name):
49        """Checks whether |interface_name| has successfully negotiated
50        802.1x.
51
52        @param interface_name string The name of the interface to check.
53
54        @return True if the authenticated flag is set, False otherwise.
55
56        """
57        device = self.get_device(interface_name)
58        device_properties = device.GetProperties(utf8_strings=True)
59        logging.info('Device properties are %r', device_properties)
60        return shill_proxy.ShillProxy.dbus2primitive(
61                device_properties[self.AUTHENTICATION_FLAG])
62
63
64    def wait_for_authentication(self, interface_name):
65        """Wait for |interface_name| to get to enter authentication state.
66
67        @param interface_name string The name of the interface to check.
68
69        """
70        device = self.get_device(interface_name)
71        result = self._shill_proxy.wait_for_property_in(
72                         device,
73                         self.AUTHENTICATION_FLAG,
74                         (True,),
75                         self.AUTHENTICATION_TIMEOUT)
76        (successful, _, _) = result
77        return successful
78
79
80    def configure_credentials(self, interface_name):
81        """Adds authentication properties to the Ethernet EAP service.
82
83        @param interface_name string The name of the associated interface
84
85        """
86        service = self._shill_proxy.configure_service({
87            'Type': 'etherneteap',
88            'EAP.EAP': hostapd_server.HostapdServer.EAP_TYPE,
89            'EAP.InnerEAP': 'auth=%s' % hostapd_server.HostapdServer.EAP_PHASE2,
90            'EAP.Identity': hostapd_server.HostapdServer.EAP_USERNAME,
91            'EAP.Password': hostapd_server.HostapdServer.EAP_PASSWORD,
92            'EAP.CACertPEM': [ site_eap_certs.ca_cert_1 ]
93        })
94
95
96    def run_once(self):
97        """Test main loop."""
98        self._shill_proxy = shill_proxy.ShillProxy()
99        manager = self._shill_proxy.manager
100
101        with shill_temporary_profile.ShillTemporaryProfile(
102                manager, profile_name=self.TEST_PROFILE_NAME):
103            with virtual_ethernet_pair.VirtualEthernetPair(
104                    peer_interface_name=self.INTERFACE_NAME,
105                    peer_interface_ip=None) as ethernet_pair:
106                if not ethernet_pair.is_healthy:
107                    raise error.TestFail('Virtual ethernet pair failed.')
108
109                if self.get_authenticated_flag(self.INTERFACE_NAME):
110                    raise error.TestFail('Authentication flag already set.')
111
112                with hostapd_server.HostapdServer(
113                        interface=ethernet_pair.interface_name) as hostapd:
114                    # Wait for hostapd to initialize.
115                    time.sleep(1)
116                    if not hostapd.running():
117                        raise error.TestFail('hostapd process exited.')
118
119                    self.configure_credentials(self.INTERFACE_NAME)
120                    hostapd.send_eap_packets()
121                    if not self.wait_for_authentication(self.INTERFACE_NAME):
122                        raise error.TestFail('Authentication did not complete.')
123
124                    client_mac_address = ethernet_pair.peer_interface_mac
125                    if not hostapd.client_has_authenticated(client_mac_address):
126                        raise error.TestFail('Server does not agree that '
127                                             'client is authenticated')
128
129                    if hostapd.client_has_logged_off(client_mac_address):
130                        raise error.TestFail('Client has already logged off')
131
132                    # Since the EAP credentials are associated with the
133                    # top-most profile, popping it should cause the client
134                    # to immediately log-off.
135                    manager.PopProfile(self.TEST_PROFILE_NAME)
136
137                    if self.get_authenticated_flag(self.INTERFACE_NAME):
138                        raise error.TestFail('Client is still authenticated.')
139
140                    if not hostapd.client_has_logged_off(client_mac_address):
141                        raise error.TestFail('Client did not log off')
142
143                    # Re-pushing the profile should make the EAP credentials
144                    # available again, and should cause the client to
145                    # re-authenticate.
146                    manager.PushProfile(self.TEST_PROFILE_NAME)
147
148                    if not self.wait_for_authentication(self.INTERFACE_NAME):
149                        raise error.TestFail('Re-authentication did not '
150                                             'complete.')
151