1# Copyright 2019 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import utils
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.cros.enterprise import enterprise_policy_base
11
12
13class policy_PlatformKeys(enterprise_policy_base.EnterprisePolicyTest):
14    version = 1
15
16
17    def initialize(self, **kwargs):
18        """Set some global variables."""
19        super(policy_PlatformKeys, self).initialize(**kwargs)
20        # PlatformKeys extension ID.
21        self.EXTENSION_ID = 'hoppbgdeajkagempifacalpdapphfoai'
22        self.EXTENSION_PAGE = ('chrome-extension://%s/main.html'
23                               % self.EXTENSION_ID)
24        self.tab = None
25
26
27    def click_button(self, id):
28        """Click an element given its ID."""
29        self.tab.ExecuteJavaScript(
30                "document.querySelector('#%s').click()" % id)
31
32
33    def field_value(self, id):
34        """Return the value of a text field."""
35        return self.tab.EvaluateJavaScript(
36            "document.querySelector('#%s').value" % id)
37
38
39    def call_api(self, button_id, field_id=None):
40        """
41        Call the API by clicking a button and checking its output fields.
42
43        @param button_id: ID of the button element.
44        @param field_id: Text field output is printed to (if any).
45
46        @returns: Output of the call, if any.
47        @raises error.TestFail: If the API call fails.
48
49        """
50        error_id = button_id + '-error'
51        self.click_button(button_id)
52
53        # Wait for the API to return 'OK' and raise an error if it doesn't.
54        utils.poll_for_condition(
55                lambda: 'OK' in self.field_value(error_id),
56                timeout=5,
57                exception=error.TestFail(
58                    'API error: %s' % self.field_value(error_id)))
59
60        if field_id:
61            field = self.field_value(field_id)
62            return field
63
64
65    def create_certificate(self):
66        """Return a certificate using the generated public key."""
67        cert = self.call_api('create-cert', 'certificate')
68        return cert.rstrip()
69
70
71    def list_certificates(self):
72        """Fetch all certificates and parse them into a list."""
73        raw_certs = self.call_api('list-certs', 'certificates')
74
75        if raw_certs:
76            pattern = re.compile('-----BEGIN CERTIFICATE-----.*?'
77                                 '-----END CERTIFICATE-----', flags=re.DOTALL)
78            certs = re.findall(pattern, raw_certs)
79        else:
80            certs = []
81
82        return certs
83
84
85    def wait_for_extension(self):
86        """Wait for the extension to install so we can open it."""
87        def load_page():
88            self.tab = self.navigate_to_url(self.EXTENSION_PAGE, self.tab)
89            return self.tab.EvaluateJavaScript(
90                      "document.querySelector('#cert-enrollment') !== null")
91
92        utils.poll_for_condition(
93            load_page,
94            timeout=15,
95            sleep_interval=1,
96            desc='Timed out waiting for extension to install.')
97
98
99    def test_platform_keys(self):
100        """
101        Test the chrome.enterprise.platformKeys API.
102
103        The following API methods are tested:
104            - getToken
105            - getCertificates
106            - importCertificate
107            - removeCertificate
108
109        """
110        self.wait_for_extension()
111
112        if self.list_certificates():
113            raise error.TestFail('Certificates list should be empty at start.')
114
115        public_key = self.call_api('generate', 'public-key')
116
117        certificate = self.create_certificate()
118        self.call_api('import-cert')
119
120        installed_certs = self.list_certificates()
121        if len(installed_certs) != 1:
122            raise error.TestFail('There should be 1 certificate instead of %s.'
123                                 % len(installed_certs))
124
125        if installed_certs[0] != certificate:
126            raise error.TestFail('Installed certificate does not match '
127                                 'expected certificate. %s != %s' %
128                                 (installed_certs[0], certificate))
129
130        self.call_api('remove-cert')
131
132        if self.list_certificates():
133            raise error.TestFail('All certificates should have been removed '
134                                 'at the end of the test.')
135
136
137    def run_once(self):
138        """Setup and run the test configured for the specified test case."""
139        self.setup_case(user_policies={
140            'ExtensionInstallForcelist': [self.EXTENSION_ID],
141            'DeveloperToolsAvailability': 1
142        })
143
144        self.test_platform_keys()
145