1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from autotest_lib.client.bin import test
6from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import utils
8from autotest_lib.client.common_lib.cros import site_eap_certs
9from autotest_lib.client.common_lib.cros import virtual_ethernet_pair
10from autotest_lib.client.cros import certificate_util
11from autotest_lib.client.cros import shill_temporary_profile
12from autotest_lib.client.cros import tpm_store
13from autotest_lib.client.cros import vpn_server
14from autotest_lib.client.cros.networking import shill_context
15from autotest_lib.client.cros.networking import shill_proxy
16
17class network_VPNConnect(test.test):
18    """The VPN authentication class.
19
20    Starts up a VPN server within a chroot on the other end of a virtual
21    ethernet pair and attempts a VPN association using shill.
22
23    """
24    CLIENT_INTERFACE_NAME = 'pseudoethernet0'
25    SERVER_INTERFACE_NAME = 'serverethernet0'
26    TEST_PROFILE_NAME = 'testVPN'
27    CONNECT_TIMEOUT_SECONDS = 15
28    version = 1
29    SERVER_ADDRESS = '10.9.8.1'
30    CLIENT_ADDRESS = '10.9.8.2'
31    NETWORK_PREFIX = 24
32
33    def get_device(self, interface_name):
34        """Finds the corresponding Device object for an ethernet
35        interface with the name |interface_name|.
36
37        @param interface_name string The name of the interface to check.
38
39        @return DBus interface object representing the associated device.
40
41        """
42        device = self._shill_proxy.find_object('Device',
43                                               {'Name': interface_name})
44        if device is None:
45            raise error.TestFail('Device was not found.')
46
47        return device
48
49
50    def find_ethernet_service(self, interface_name):
51        """Finds the corresponding service object for an ethernet
52        interface.
53
54        @param interface_name string The name of the associated interface
55
56        @return Service object representing the associated service.
57
58        """
59        device = self.get_device(interface_name)
60        device_path = shill_proxy.ShillProxy.dbus2primitive(device.object_path)
61        return self._shill_proxy.find_object('Service', {'Device': device_path})
62
63
64    def get_vpn_server(self):
65        """Returns a VPN server instance."""
66        if self._vpn_type.startswith('l2tpipsec-psk'):
67            return vpn_server.L2TPIPSecVPNServer(
68                'psk',
69                self.SERVER_INTERFACE_NAME,
70                self.SERVER_ADDRESS,
71                self.NETWORK_PREFIX,
72                perform_xauth_authentication = 'xauth' in self._vpn_type,
73                local_ip_is_public_ip = 'evil' in self._vpn_type)
74        elif self._vpn_type.startswith('l2tpipsec-cert'):
75            return vpn_server.L2TPIPSecVPNServer('cert',
76                                                 self.SERVER_INTERFACE_NAME,
77                                                 self.SERVER_ADDRESS,
78                                                 self.NETWORK_PREFIX)
79        elif self._vpn_type.startswith('openvpn'):
80            return vpn_server.OpenVPNServer(self.SERVER_INTERFACE_NAME,
81                                            self.SERVER_ADDRESS,
82                                            self.NETWORK_PREFIX,
83                                            'user_pass' in self._vpn_type)
84        else:
85            raise error.TestFail('Unknown vpn server type %s' % self._vpn_type)
86
87
88    def get_vpn_client_properties(self, tpm):
89        """Returns VPN configuration properties.
90
91        @param tpm object TPM store instance to add credentials if necessary.
92
93        """
94        if self._vpn_type.startswith('l2tpipsec-psk'):
95            params = {
96                'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
97                'L2TPIPsec.PSK':
98                        vpn_server.L2TPIPSecVPNServer.IPSEC_PRESHARED_KEY,
99                'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
100                'Name': 'test-vpn-l2tp-psk',
101                'Provider.Host': self.SERVER_ADDRESS,
102                'Provider.Type': 'l2tpipsec',
103                'Type': 'vpn'
104            }
105            if 'xauth' in self._vpn_type:
106                if 'incorrect_user' in self._vpn_type:
107                    params['L2TPIPsec.XauthUser'] = 'wrong_user'
108                    params['L2TPIPsec.XauthPassword'] = 'wrong_password'
109                elif 'incorrect_missing_user' not in self._vpn_type:
110                    params['L2TPIPsec.XauthUser'] = (
111                            vpn_server.L2TPIPSecVPNServer.XAUTH_USER)
112                    params['L2TPIPsec.XauthPassword'] = (
113                            vpn_server.L2TPIPSecVPNServer.XAUTH_PASSWORD)
114            return params
115        elif self._vpn_type == 'l2tpipsec-cert':
116            tpm.install_certificate(site_eap_certs.client_cert_1,
117                                    site_eap_certs.cert_1_tpm_key_id)
118            tpm.install_private_key(site_eap_certs.client_private_key_1,
119                                    site_eap_certs.cert_1_tpm_key_id)
120            return {
121                'L2TPIPsec.CACertPEM': [ site_eap_certs.ca_cert_1 ],
122                'L2TPIPsec.ClientCertID': site_eap_certs.cert_1_tpm_key_id,
123                'L2TPIPsec.ClientCertSlot': tpm.SLOT_ID,
124                'L2TPIPsec.User':vpn_server.L2TPIPSecVPNServer.CHAP_USER,
125                'L2TPIPsec.Password': vpn_server.L2TPIPSecVPNServer.CHAP_SECRET,
126                'L2TPIPsec.PIN': tpm.PIN,
127                'Name': 'test-vpn-l2tp-cert',
128                'Provider.Host': self.SERVER_ADDRESS,
129                'Provider.Type': 'l2tpipsec',
130                'Type': 'vpn'
131            }
132        elif self._vpn_type.startswith('openvpn'):
133            tpm.install_certificate(site_eap_certs.client_cert_1,
134                                    site_eap_certs.cert_1_tpm_key_id)
135            tpm.install_private_key(site_eap_certs.client_private_key_1,
136                                    site_eap_certs.cert_1_tpm_key_id)
137            params = {
138                'Name': 'test-vpn-openvpn',
139                'Provider.Host': self.SERVER_ADDRESS,
140                'Provider.Type': 'openvpn',
141                'Type': 'vpn',
142                'OpenVPN.CACertPEM': [ site_eap_certs.ca_cert_1 ],
143                'OpenVPN.Pkcs11.ID': site_eap_certs.cert_1_tpm_key_id,
144                'OpenVPN.Pkcs11.PIN': tpm.PIN,
145                'OpenVPN.RemoteCertEKU': 'TLS Web Server Authentication',
146                'OpenVPN.Verb': '5'
147            }
148            if 'user_pass' in self._vpn_type:
149                params['OpenVPN.User'] = vpn_server.OpenVPNServer.USERNAME
150                params['OpenVPN.Password'] = vpn_server.OpenVPNServer.PASSWORD
151            if 'cert_verify' in self._vpn_type:
152                ca = certificate_util.PEMCertificate(site_eap_certs.ca_cert_1)
153                if 'incorrect_hash' in self._vpn_type:
154                    bogus_hash = ':'.join(['00'] * 20)
155                    params['OpenVPN.VerifyHash'] = bogus_hash
156                else:
157                    params['OpenVPN.VerifyHash'] = ca.fingerprint
158                server = certificate_util.PEMCertificate(
159                        site_eap_certs.server_cert_1)
160                if 'incorrect_subject' in self._vpn_type:
161                    params['OpenVPN.VerifyX509Name'] = 'bogus subject name'
162                elif 'incorrect_cn' in self._vpn_type:
163                    params['OpenVPN.VerifyX509Name'] = 'bogus cn'
164                    params['OpenVPN.VerifyX509Type'] = 'name'
165                elif 'cn_only' in self._vpn_type:
166                    params['OpenVPN.VerifyX509Name'] = server.subject_dict['CN']
167                    params['OpenVPN.VerifyX509Type'] = 'name'
168                else:
169                    # This is the form OpenVPN expects.
170                    params['OpenVPN.VerifyX509Name'] = ', '.join(server.subject)
171            return params
172        else:
173            raise error.TestFail('Unknown vpn client type %s' % self._vpn_type)
174
175
176    def connect_vpn(self):
177        """Connects the client to the VPN server."""
178        proxy = self._shill_proxy
179        with tpm_store.TPMStore() as tpm:
180            service = proxy.configure_service(
181                self.get_vpn_client_properties(tpm))
182            service.Connect()
183            result = proxy.wait_for_property_in(service,
184                                                proxy.SERVICE_PROPERTY_STATE,
185                                                ('ready', 'online'),
186                                                self.CONNECT_TIMEOUT_SECONDS)
187        (successful, _, _) = result
188        if not successful and self._expect_success:
189            raise error.TestFail('VPN connection failed')
190        if successful and not self._expect_success:
191            raise error.TestFail('VPN connection suceeded '
192                                 'when it should have failed')
193        return successful
194
195
196    def run_once(self, vpn_types=[]):
197        """Test main loop."""
198        self._shill_proxy = shill_proxy.ShillProxy()
199        for vpn_type in vpn_types:
200            self.run_vpn_test(vpn_type)
201
202
203    def run_vpn_test(self, vpn_type):
204        """Run a vpn test of |vpn_type|.
205
206        @param vpn_type string type of VPN test to run.
207
208        """
209        manager = self._shill_proxy.manager
210        server_address_and_prefix = '%s/%d' % (self.SERVER_ADDRESS,
211                                               self.NETWORK_PREFIX)
212        client_address_and_prefix = '%s/%d' % (self.CLIENT_ADDRESS,
213                                               self.NETWORK_PREFIX)
214        self._vpn_type = vpn_type
215        self._expect_success = 'incorrect' not in vpn_type
216
217        with shill_temporary_profile.ShillTemporaryProfile(
218                manager, profile_name=self.TEST_PROFILE_NAME):
219            with virtual_ethernet_pair.VirtualEthernetPair(
220                    interface_name=self.SERVER_INTERFACE_NAME,
221                    peer_interface_name=self.CLIENT_INTERFACE_NAME,
222                    peer_interface_ip=client_address_and_prefix,
223                    interface_ip=server_address_and_prefix,
224                    ignore_shutdown_errors=True) as ethernet_pair:
225                if not ethernet_pair.is_healthy:
226                    raise error.TestFail('Virtual ethernet pair failed.')
227
228                with self.get_vpn_server() as server:
229                    # We have to poll and wait the service to be ready in shill
230                    # because the shill update of "CLIENT_INTERFACE_NAME" is
231                    # async.
232                    service = utils.poll_for_condition(
233                        lambda: self.find_ethernet_service(
234                            self.CLIENT_INTERFACE_NAME))
235                    # When shill finds this ethernet interface, it will reset
236                    # its IP address and start a DHCP client.  We must configure
237                    # the static IP address through shill.
238                    static_ip_config = {'Address' : self.CLIENT_ADDRESS,
239                                        'Prefixlen' : self.NETWORK_PREFIX}
240                    with shill_context.StaticIPContext(service,
241                                                       static_ip_config):
242                        if self.connect_vpn():
243                            res = utils.ping(server.SERVER_IP_ADDRESS, tries=3,
244                                             user='chronos')
245                            if res != 0:
246                                raise error.TestFail('Error pinging server IP')
247
248                            # IPv6 should be blackholed, so ping returns
249                            # "other error"
250                            res = utils.ping("2001:db8::1", tries=1,
251                                             user='chronos')
252                            if res != 2:
253                                raise error.TestFail(
254                                        'IPv6 ping should have aborted')
255