1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import time 7 8from autotest_lib.client.bin import test 9from autotest_lib.client.common_lib import error 10from autotest_lib.client.common_lib.cros import site_eap_certs 11from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 12from autotest_lib.client.cros import hostapd_server 13from autotest_lib.client.cros import shill_temporary_profile 14from autotest_lib.client.cros.networking import shill_proxy 15 16class network_8021xWiredAuthentication(test.test): 17 """The 802.1x EAP wired authentication class. 18 19 Runs hostapd on one side of an ethernet pair, and shill on the other. 20 Configures the Ethernet service with 802.1x credentials and ensures 21 that when shill detects an EAP authenticator, it is successful in 22 using its credentials to gain access. 23 24 """ 25 INTERFACE_NAME = 'pseudoethernet0' 26 AUTHENTICATION_FLAG = 'EapAuthenticationCompleted' 27 TEST_PROFILE_NAME = 'test1x' 28 AUTHENTICATION_TIMEOUT = 10 29 version = 1 30 31 def get_device(self, interface_name): 32 """Finds the corresponding Device object for an ethernet 33 interface with the name |interface_name|. 34 35 @param interface_name string The name of the interface to check. 36 37 @return DBus interface object representing the associated device. 38 39 """ 40 device = self._shill_proxy.find_object('Device', 41 {'Name': interface_name}) 42 if device is None: 43 raise error.TestFail('Device was not found.') 44 45 return device 46 47 48 def get_authenticated_flag(self, interface_name): 49 """Checks whether |interface_name| has successfully negotiated 50 802.1x. 51 52 @param interface_name string The name of the interface to check. 53 54 @return True if the authenticated flag is set, False otherwise. 55 56 """ 57 device = self.get_device(interface_name) 58 device_properties = device.GetProperties(utf8_strings=True) 59 logging.info('Device properties are %r', device_properties) 60 return shill_proxy.ShillProxy.dbus2primitive( 61 device_properties[self.AUTHENTICATION_FLAG]) 62 63 64 def wait_for_authentication(self, interface_name): 65 """Wait for |interface_name| to get to enter authentication state. 66 67 @param interface_name string The name of the interface to check. 68 69 """ 70 device = self.get_device(interface_name) 71 result = self._shill_proxy.wait_for_property_in( 72 device, 73 self.AUTHENTICATION_FLAG, 74 (True,), 75 self.AUTHENTICATION_TIMEOUT) 76 (successful, _, _) = result 77 return successful 78 79 80 def configure_credentials(self, interface_name): 81 """Adds authentication properties to the Ethernet EAP service. 82 83 @param interface_name string The name of the associated interface 84 85 """ 86 service = self._shill_proxy.configure_service({ 87 'Type': 'etherneteap', 88 'EAP.EAP': hostapd_server.HostapdServer.EAP_TYPE, 89 'EAP.InnerEAP': 'auth=%s' % hostapd_server.HostapdServer.EAP_PHASE2, 90 'EAP.Identity': hostapd_server.HostapdServer.EAP_USERNAME, 91 'EAP.Password': hostapd_server.HostapdServer.EAP_PASSWORD, 92 'EAP.CACertPEM': [ site_eap_certs.ca_cert_1 ] 93 }) 94 95 96 def run_once(self): 97 """Test main loop.""" 98 self._shill_proxy = shill_proxy.ShillProxy() 99 manager = self._shill_proxy.manager 100 101 with shill_temporary_profile.ShillTemporaryProfile( 102 manager, profile_name=self.TEST_PROFILE_NAME): 103 with virtual_ethernet_pair.VirtualEthernetPair( 104 peer_interface_name=self.INTERFACE_NAME, 105 peer_interface_ip=None) as ethernet_pair: 106 if not ethernet_pair.is_healthy: 107 raise error.TestFail('Virtual ethernet pair failed.') 108 109 if self.get_authenticated_flag(self.INTERFACE_NAME): 110 raise error.TestFail('Authentication flag already set.') 111 112 with hostapd_server.HostapdServer( 113 interface=ethernet_pair.interface_name) as hostapd: 114 # Wait for hostapd to initialize. 115 time.sleep(1) 116 if not hostapd.running(): 117 raise error.TestFail('hostapd process exited.') 118 119 self.configure_credentials(self.INTERFACE_NAME) 120 hostapd.send_eap_packets() 121 if not self.wait_for_authentication(self.INTERFACE_NAME): 122 raise error.TestFail('Authentication did not complete.') 123 124 client_mac_address = ethernet_pair.peer_interface_mac 125 if not hostapd.client_has_authenticated(client_mac_address): 126 raise error.TestFail('Server does not agree that ' 127 'client is authenticated') 128 129 if hostapd.client_has_logged_off(client_mac_address): 130 raise error.TestFail('Client has already logged off') 131 132 # Since the EAP credentials are associated with the 133 # top-most profile, popping it should cause the client 134 # to immediately log-off. 135 manager.PopProfile(self.TEST_PROFILE_NAME) 136 137 if self.get_authenticated_flag(self.INTERFACE_NAME): 138 raise error.TestFail('Client is still authenticated.') 139 140 if not hostapd.client_has_logged_off(client_mac_address): 141 raise error.TestFail('Client did not log off') 142 143 # Re-pushing the profile should make the EAP credentials 144 # available again, and should cause the client to 145 # re-authenticate. 146 manager.PushProfile(self.TEST_PROFILE_NAME) 147 148 if not self.wait_for_authentication(self.INTERFACE_NAME): 149 raise error.TestFail('Re-authentication did not ' 150 'complete.') 151