1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6from autotest_lib.client.bin import test 7from autotest_lib.client.common_lib import error 8from autotest_lib.client.common_lib import utils 9from autotest_lib.client.common_lib.cros import site_eap_certs 10from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 11from autotest_lib.client.cros import certificate_util 12from autotest_lib.client.cros import shill_temporary_profile 13from autotest_lib.client.cros import tpm_store 14from autotest_lib.client.cros import vpn_server 15from autotest_lib.client.cros.networking import shill_proxy 16 17class network_VPNConnect(test.test): 18 """The VPN authentication class. 19 20 Starts up a VPN server within a chroot on the other end of a virtual 21 ethernet pair and attempts a VPN association using shill. 22 23 """ 24 CLIENT_INTERFACE_NAME = 'pseudoethernet0' 25 SERVER_INTERFACE_NAME = 'serverethernet0' 26 TEST_PROFILE_NAME = 'testVPN' 27 CONNECT_TIMEOUT_SECONDS = 15 28 version = 1 29 SERVER_ADDRESS = '10.9.8.1' 30 CLIENT_ADDRESS = '10.9.8.2' 31 NETWORK_PREFIX = 24 32 33 def get_device(self, interface_name): 34 """Finds the corresponding Device object for an ethernet 35 interface with the name |interface_name|. 36 37 @param interface_name string The name of the interface to check. 38 39 @return DBus interface object representing the associated device. 40 41 """ 42 device = self._shill_proxy.find_object('Device', 43 {'Name': interface_name}) 44 if device is None: 45 raise error.TestFail('Device was not found.') 46 47 return device 48 49 50 def find_ethernet_service(self, interface_name): 51 """Finds the corresponding service object for an ethernet 52 interface. 53 54 @param interface_name string The name of the associated interface 55 56 @return Service object representing the associated service. 57 58 """ 59 device = self.get_device(interface_name) 60 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 61 return self._shill_proxy.find_object('Service', {'Device': device_path}) 62 63 64 def configure_static_ip(self, interface_name, address, prefix_len): 65 """Configures the Static IP parameters for the Ethernet interface 66 |interface_name| and applies those parameters to the interface by 67 forcing a re-connect. 68 69 @param interface_name string The name of the associated interface. 70 @param address string the IP address this interface should have. 71 @param prefix_len string the IP address prefix for the interface. 72 73 """ 74 service = self.find_ethernet_service(interface_name) 75 service.SetProperty('StaticIP.Address', address) 76 service.SetProperty('StaticIP.Prefixlen', prefix_len) 77 service.Disconnect() 78 service.Connect() 79 80 81 def get_vpn_server(self): 82 """Returns a VPN server instance.""" 83 if self._vpn_type.startswith('l2tpipsec-psk'): 84 return vpn_server.L2TPIPSecVPNServer( 85 'psk', 86 self.SERVER_INTERFACE_NAME, 87 self.SERVER_ADDRESS, 88 self.NETWORK_PREFIX, 89 perform_xauth_authentication = 'xauth' in self._vpn_type, 90 local_ip_is_public_ip = 'evil' in self._vpn_type) 91 elif self._vpn_type.startswith('l2tpipsec-cert'): 92 return vpn_server.L2TPIPSecVPNServer('cert', 93 self.SERVER_INTERFACE_NAME, 94 self.SERVER_ADDRESS, 95 self.NETWORK_PREFIX) 96 elif self._vpn_type.startswith('openvpn'): 97 return vpn_server.OpenVPNServer(self.SERVER_INTERFACE_NAME, 98 self.SERVER_ADDRESS, 99 self.NETWORK_PREFIX, 100 'user_pass' in self._vpn_type) 101 else: 102 raise error.TestFail('Unknown vpn server type %s' % self._vpn_type) 103 104 105 def get_vpn_client_properties(self, tpm): 106 """Returns VPN configuration properties. 107 108 @param tpm object TPM store instance to add credentials if necessary. 109 110 """ 111 if self._vpn_type.startswith('l2tpipsec-psk'): 112 params = { 113 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET, 114 'L2TPIPsec.PSK': 115 vpn_server.L2TPIPSecVPNServer.IPSEC_PRESHARED_KEY, 116 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER, 117 'Name': 'test-vpn-l2tp-psk', 118 'Provider.Host': self.SERVER_ADDRESS, 119 'Provider.Type': 'l2tpipsec', 120 'Type': 'vpn', 121 'VPN.Domain': 'test-vpn-psk-domain' 122 } 123 if 'xauth' in self._vpn_type: 124 if 'incorrect_user' in self._vpn_type: 125 params['L2TPIPsec.XauthUser'] = 'wrong_user' 126 params['L2TPIPsec.XauthPassword'] = 'wrong_password' 127 elif 'incorrect_missing_user' not in self._vpn_type: 128 params['L2TPIPsec.XauthUser'] = ( 129 vpn_server.L2TPIPSecVPNServer.XAUTH_USER) 130 params['L2TPIPsec.XauthPassword'] = ( 131 vpn_server.L2TPIPSecVPNServer.XAUTH_PASSWORD) 132 return params 133 elif self._vpn_type == 'l2tpipsec-cert': 134 tpm.install_certificate(site_eap_certs.client_cert_1, 135 site_eap_certs.cert_1_tpm_key_id) 136 tpm.install_private_key(site_eap_certs.client_private_key_1, 137 site_eap_certs.cert_1_tpm_key_id) 138 return { 139 'L2TPIPsec.CACertPEM': [ site_eap_certs.ca_cert_1 ], 140 'L2TPIPsec.ClientCertID': site_eap_certs.cert_1_tpm_key_id, 141 'L2TPIPsec.ClientCertSlot': tpm.SLOT_ID, 142 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER, 143 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET, 144 'L2TPIPsec.PIN': tpm.PIN, 145 'Name': 'test-vpn-l2tp-cert', 146 'Provider.Host': self.SERVER_ADDRESS, 147 'Provider.Type': 'l2tpipsec', 148 'Type': 'vpn', 149 'VPN.Domain': 'test-vpn-psk-domain' 150 } 151 elif self._vpn_type.startswith('openvpn'): 152 tpm.install_certificate(site_eap_certs.client_cert_1, 153 site_eap_certs.cert_1_tpm_key_id) 154 tpm.install_private_key(site_eap_certs.client_private_key_1, 155 site_eap_certs.cert_1_tpm_key_id) 156 params = { 157 'Name': 'test-vpn-openvpn', 158 'Provider.Host': self.SERVER_ADDRESS, 159 'Provider.Type': 'openvpn', 160 'Type': 'vpn', 161 'VPN.Domain': 'test-openvpn-domain', 162 'OpenVPN.CACertPEM': [ site_eap_certs.ca_cert_1 ], 163 'OpenVPN.Pkcs11.ID': site_eap_certs.cert_1_tpm_key_id, 164 'OpenVPN.Pkcs11.PIN': tpm.PIN, 165 'OpenVPN.RemoteCertEKU': 'TLS Web Server Authentication', 166 'OpenVPN.Verb': '5' 167 } 168 if 'user_pass' in self._vpn_type: 169 params['OpenVPN.User'] = vpn_server.OpenVPNServer.USERNAME 170 params['OpenVPN.Password'] = vpn_server.OpenVPNServer.PASSWORD 171 if 'cert_verify' in self._vpn_type: 172 ca = certificate_util.PEMCertificate(site_eap_certs.ca_cert_1) 173 if 'incorrect_hash' in self._vpn_type: 174 bogus_hash = ':'.join(['00'] * 20) 175 params['OpenVPN.VerifyHash'] = bogus_hash 176 else: 177 params['OpenVPN.VerifyHash'] = ca.fingerprint 178 server = certificate_util.PEMCertificate( 179 site_eap_certs.server_cert_1) 180 if 'incorrect_subject' in self._vpn_type: 181 params['OpenVPN.VerifyX509Name'] = 'bogus subject name' 182 elif 'incorrect_cn' in self._vpn_type: 183 params['OpenVPN.VerifyX509Name'] = 'bogus cn' 184 params['OpenVPN.VerifyX509Type'] = 'name' 185 elif 'cn_only' in self._vpn_type: 186 params['OpenVPN.VerifyX509Name'] = server.subject_dict['CN'] 187 params['OpenVPN.VerifyX509Type'] = 'name' 188 else: 189 # This is the form OpenVPN expects. 190 params['OpenVPN.VerifyX509Name'] = ', '.join(server.subject) 191 return params 192 else: 193 raise error.TestFail('Unknown vpn client type %s' % self._vpn_type) 194 195 196 def connect_vpn(self): 197 """Connects the client to the VPN server.""" 198 proxy = self._shill_proxy 199 with tpm_store.TPMStore() as tpm: 200 service = proxy.get_service(self.get_vpn_client_properties(tpm)) 201 service.Connect() 202 result = proxy.wait_for_property_in(service, 203 proxy.SERVICE_PROPERTY_STATE, 204 ('ready', 'online'), 205 self.CONNECT_TIMEOUT_SECONDS) 206 (successful, _, _) = result 207 if not successful and self._expect_success: 208 raise error.TestFail('VPN connection failed') 209 if successful and not self._expect_success: 210 raise error.TestFail('VPN connection suceeded ' 211 'when it should have failed') 212 return successful 213 214 215 def run_once(self, vpn_types=[]): 216 """Test main loop.""" 217 self._shill_proxy = shill_proxy.ShillProxy() 218 for vpn_type in vpn_types: 219 self.run_vpn_test(vpn_type) 220 221 222 def run_vpn_test(self, vpn_type): 223 """Run a vpn test of |vpn_type|. 224 225 @param vpn_type string type of VPN test to run. 226 227 """ 228 manager = self._shill_proxy.manager 229 server_address_and_prefix = '%s/%d' % (self.SERVER_ADDRESS, 230 self.NETWORK_PREFIX) 231 client_address_and_prefix = '%s/%d' % (self.CLIENT_ADDRESS, 232 self.NETWORK_PREFIX) 233 self._vpn_type = vpn_type 234 self._expect_success = 'incorrect' not in vpn_type 235 236 with shill_temporary_profile.ShillTemporaryProfile( 237 manager, profile_name=self.TEST_PROFILE_NAME): 238 with virtual_ethernet_pair.VirtualEthernetPair( 239 interface_name=self.SERVER_INTERFACE_NAME, 240 peer_interface_name=self.CLIENT_INTERFACE_NAME, 241 peer_interface_ip=client_address_and_prefix, 242 interface_ip=server_address_and_prefix, 243 ignore_shutdown_errors=True) as ethernet_pair: 244 if not ethernet_pair.is_healthy: 245 raise error.TestFail('Virtual ethernet pair failed.') 246 247 # When shill finds this ethernet interface, it will reset 248 # its IP address and start a DHCP client. We must configure 249 # the static IP address through shill. 250 self.configure_static_ip(self.CLIENT_INTERFACE_NAME, 251 self.CLIENT_ADDRESS, 252 self.NETWORK_PREFIX) 253 254 with self.get_vpn_server() as server: 255 if self.connect_vpn(): 256 res = utils.ping(server.SERVER_IP_ADDRESS, tries=3, 257 user='chronos') 258 if res != 0: 259 raise error.TestFail('Error pinging server IP') 260 261 # IPv6 should be blackholed, so ping returns 262 # "other error" 263 res = utils.ping("2001:db8::1", tries=1, user='chronos') 264 if res != 2: 265 raise error.TestFail('IPv6 ping should ' 266 'have aborted') 267