1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from autotest_lib.client.bin import test 6from autotest_lib.client.common_lib import error 7from autotest_lib.client.common_lib import utils 8from autotest_lib.client.common_lib.cros import site_eap_certs 9from autotest_lib.client.common_lib.cros import virtual_ethernet_pair 10from autotest_lib.client.cros import certificate_util 11from autotest_lib.client.cros import shill_temporary_profile 12from autotest_lib.client.cros import tpm_store 13from autotest_lib.client.cros import vpn_server 14from autotest_lib.client.cros.networking import shill_context 15from autotest_lib.client.cros.networking import shill_proxy 16 17class network_VPNConnect(test.test): 18 """The VPN authentication class. 19 20 Starts up a VPN server within a chroot on the other end of a virtual 21 ethernet pair and attempts a VPN association using shill. 22 23 """ 24 CLIENT_INTERFACE_NAME = 'pseudoethernet0' 25 SERVER_INTERFACE_NAME = 'serverethernet0' 26 TEST_PROFILE_NAME = 'testVPN' 27 CONNECT_TIMEOUT_SECONDS = 15 28 version = 1 29 SERVER_ADDRESS = '10.9.8.1' 30 CLIENT_ADDRESS = '10.9.8.2' 31 NETWORK_PREFIX = 24 32 33 def get_device(self, interface_name): 34 """Finds the corresponding Device object for an ethernet 35 interface with the name |interface_name|. 36 37 @param interface_name string The name of the interface to check. 38 39 @return DBus interface object representing the associated device. 40 41 """ 42 device = self._shill_proxy.find_object('Device', 43 {'Name': interface_name}) 44 if device is None: 45 raise error.TestFail('Device was not found.') 46 47 return device 48 49 50 def find_ethernet_service(self, interface_name): 51 """Finds the corresponding service object for an ethernet 52 interface. 53 54 @param interface_name string The name of the associated interface 55 56 @return Service object representing the associated service. 57 58 """ 59 device = self.get_device(interface_name) 60 device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path) 61 return self._shill_proxy.find_object('Service', {'Device': device_path}) 62 63 64 def get_vpn_server(self): 65 """Returns a VPN server instance.""" 66 if self._vpn_type.startswith('l2tpipsec-psk'): 67 return vpn_server.L2TPIPSecVPNServer( 68 'psk', 69 self.SERVER_INTERFACE_NAME, 70 self.SERVER_ADDRESS, 71 self.NETWORK_PREFIX, 72 perform_xauth_authentication = 'xauth' in self._vpn_type, 73 local_ip_is_public_ip = 'evil' in self._vpn_type) 74 elif self._vpn_type.startswith('l2tpipsec-cert'): 75 return vpn_server.L2TPIPSecVPNServer('cert', 76 self.SERVER_INTERFACE_NAME, 77 self.SERVER_ADDRESS, 78 self.NETWORK_PREFIX) 79 elif self._vpn_type.startswith('openvpn'): 80 return vpn_server.OpenVPNServer(self.SERVER_INTERFACE_NAME, 81 self.SERVER_ADDRESS, 82 self.NETWORK_PREFIX, 83 'user_pass' in self._vpn_type) 84 else: 85 raise error.TestFail('Unknown vpn server type %s' % self._vpn_type) 86 87 88 def get_vpn_client_properties(self, tpm): 89 """Returns VPN configuration properties. 90 91 @param tpm object TPM store instance to add credentials if necessary. 92 93 """ 94 if self._vpn_type.startswith('l2tpipsec-psk'): 95 params = { 96 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET, 97 'L2TPIPsec.PSK': 98 vpn_server.L2TPIPSecVPNServer.IPSEC_PRESHARED_KEY, 99 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER, 100 'Name': 'test-vpn-l2tp-psk', 101 'Provider.Host': self.SERVER_ADDRESS, 102 'Provider.Type': 'l2tpipsec', 103 'Type': 'vpn' 104 } 105 if 'xauth' in self._vpn_type: 106 if 'incorrect_user' in self._vpn_type: 107 params['L2TPIPsec.XauthUser'] = 'wrong_user' 108 params['L2TPIPsec.XauthPassword'] = 'wrong_password' 109 elif 'incorrect_missing_user' not in self._vpn_type: 110 params['L2TPIPsec.XauthUser'] = ( 111 vpn_server.L2TPIPSecVPNServer.XAUTH_USER) 112 params['L2TPIPsec.XauthPassword'] = ( 113 vpn_server.L2TPIPSecVPNServer.XAUTH_PASSWORD) 114 return params 115 elif self._vpn_type == 'l2tpipsec-cert': 116 tpm.install_certificate(site_eap_certs.client_cert_1, 117 site_eap_certs.cert_1_tpm_key_id) 118 tpm.install_private_key(site_eap_certs.client_private_key_1, 119 site_eap_certs.cert_1_tpm_key_id) 120 return { 121 'L2TPIPsec.CACertPEM': [ site_eap_certs.ca_cert_1 ], 122 'L2TPIPsec.ClientCertID': site_eap_certs.cert_1_tpm_key_id, 123 'L2TPIPsec.ClientCertSlot': tpm.SLOT_ID, 124 'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER, 125 'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET, 126 'L2TPIPsec.PIN': tpm.PIN, 127 'Name': 'test-vpn-l2tp-cert', 128 'Provider.Host': self.SERVER_ADDRESS, 129 'Provider.Type': 'l2tpipsec', 130 'Type': 'vpn' 131 } 132 elif self._vpn_type.startswith('openvpn'): 133 tpm.install_certificate(site_eap_certs.client_cert_1, 134 site_eap_certs.cert_1_tpm_key_id) 135 tpm.install_private_key(site_eap_certs.client_private_key_1, 136 site_eap_certs.cert_1_tpm_key_id) 137 params = { 138 'Name': 'test-vpn-openvpn', 139 'Provider.Host': self.SERVER_ADDRESS, 140 'Provider.Type': 'openvpn', 141 'Type': 'vpn', 142 'OpenVPN.CACertPEM': [ site_eap_certs.ca_cert_1 ], 143 'OpenVPN.Pkcs11.ID': site_eap_certs.cert_1_tpm_key_id, 144 'OpenVPN.Pkcs11.PIN': tpm.PIN, 145 'OpenVPN.RemoteCertEKU': 'TLS Web Server Authentication', 146 'OpenVPN.Verb': '5' 147 } 148 if 'user_pass' in self._vpn_type: 149 params['OpenVPN.User'] = vpn_server.OpenVPNServer.USERNAME 150 params['OpenVPN.Password'] = vpn_server.OpenVPNServer.PASSWORD 151 if 'cert_verify' in self._vpn_type: 152 ca = certificate_util.PEMCertificate(site_eap_certs.ca_cert_1) 153 if 'incorrect_hash' in self._vpn_type: 154 bogus_hash = ':'.join(['00'] * 20) 155 params['OpenVPN.VerifyHash'] = bogus_hash 156 else: 157 params['OpenVPN.VerifyHash'] = ca.fingerprint 158 server = certificate_util.PEMCertificate( 159 site_eap_certs.server_cert_1) 160 if 'incorrect_subject' in self._vpn_type: 161 params['OpenVPN.VerifyX509Name'] = 'bogus subject name' 162 elif 'incorrect_cn' in self._vpn_type: 163 params['OpenVPN.VerifyX509Name'] = 'bogus cn' 164 params['OpenVPN.VerifyX509Type'] = 'name' 165 elif 'cn_only' in self._vpn_type: 166 params['OpenVPN.VerifyX509Name'] = server.subject_dict['CN'] 167 params['OpenVPN.VerifyX509Type'] = 'name' 168 else: 169 # This is the form OpenVPN expects. 170 params['OpenVPN.VerifyX509Name'] = ', '.join(server.subject) 171 return params 172 else: 173 raise error.TestFail('Unknown vpn client type %s' % self._vpn_type) 174 175 176 def connect_vpn(self): 177 """Connects the client to the VPN server.""" 178 proxy = self._shill_proxy 179 with tpm_store.TPMStore() as tpm: 180 service = proxy.configure_service( 181 self.get_vpn_client_properties(tpm)) 182 service.Connect() 183 result = proxy.wait_for_property_in(service, 184 proxy.SERVICE_PROPERTY_STATE, 185 ('ready', 'online'), 186 self.CONNECT_TIMEOUT_SECONDS) 187 (successful, _, _) = result 188 if not successful and self._expect_success: 189 raise error.TestFail('VPN connection failed') 190 if successful and not self._expect_success: 191 raise error.TestFail('VPN connection suceeded ' 192 'when it should have failed') 193 return successful 194 195 196 def run_once(self, vpn_types=[]): 197 """Test main loop.""" 198 self._shill_proxy = shill_proxy.ShillProxy() 199 for vpn_type in vpn_types: 200 self.run_vpn_test(vpn_type) 201 202 203 def run_vpn_test(self, vpn_type): 204 """Run a vpn test of |vpn_type|. 205 206 @param vpn_type string type of VPN test to run. 207 208 """ 209 manager = self._shill_proxy.manager 210 server_address_and_prefix = '%s/%d' % (self.SERVER_ADDRESS, 211 self.NETWORK_PREFIX) 212 client_address_and_prefix = '%s/%d' % (self.CLIENT_ADDRESS, 213 self.NETWORK_PREFIX) 214 self._vpn_type = vpn_type 215 self._expect_success = 'incorrect' not in vpn_type 216 217 with shill_temporary_profile.ShillTemporaryProfile( 218 manager, profile_name=self.TEST_PROFILE_NAME): 219 with virtual_ethernet_pair.VirtualEthernetPair( 220 interface_name=self.SERVER_INTERFACE_NAME, 221 peer_interface_name=self.CLIENT_INTERFACE_NAME, 222 peer_interface_ip=client_address_and_prefix, 223 interface_ip=server_address_and_prefix, 224 ignore_shutdown_errors=True) as ethernet_pair: 225 if not ethernet_pair.is_healthy: 226 raise error.TestFail('Virtual ethernet pair failed.') 227 228 with self.get_vpn_server() as server: 229 # We have to poll and wait the service to be ready in shill 230 # because the shill update of "CLIENT_INTERFACE_NAME" is 231 # async. 232 service = utils.poll_for_condition( 233 lambda: self.find_ethernet_service( 234 self.CLIENT_INTERFACE_NAME)) 235 # When shill finds this ethernet interface, it will reset 236 # its IP address and start a DHCP client. We must configure 237 # the static IP address through shill. 238 static_ip_config = {'Address' : self.CLIENT_ADDRESS, 239 'Prefixlen' : self.NETWORK_PREFIX} 240 with shill_context.StaticIPContext(service, 241 static_ip_config): 242 if self.connect_vpn(): 243 res = utils.ping(server.SERVER_IP_ADDRESS, tries=3, 244 user='chronos') 245 if res != 0: 246 raise error.TestFail('Error pinging server IP') 247 248 # IPv6 should be blackholed, so ping returns 249 # "other error" 250 res = utils.ping("2001:db8::1", tries=1, 251 user='chronos') 252 if res != 2: 253 raise error.TestFail( 254 'IPv6 ping should have aborted') 255