1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Utility functions used for PKCS#11 library testing."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import grp, logging, os, pwd, re, stat, sys, shutil, pwd, grp
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16
17USER_TOKEN_PREFIX = 'User TPM Token '
18TMP_CHAPS_DIR = '/tmp/chaps'
19CHAPS_DIR_PERM = 0o750
20SYSTEM_TOKEN_NAME = 'System TPM Token'
21SYSTEM_TOKEN_DIR = '/var/lib/chaps'
22INVALID_SLOT_ID = '100'
23
24
25def __run_cmd(cmd, ignore_status=False):
26    """Runs a command and returns the output from both stdout and stderr."""
27    return utils.system_output(cmd + ' 2>&1', retain_output=True,
28                               ignore_status=ignore_status).strip()
29
30def __get_token_paths(exclude_system_token):
31    """Return a list with a path for each PKCS #11 token currently loaded."""
32    token_paths = []
33    for line in __run_cmd('chaps_client --list').split('\n'):
34        match = re.search(r'Slot \d+: (/.*)$', line)
35        if match:
36            if exclude_system_token and match.group(1) == SYSTEM_TOKEN_DIR:
37                continue
38            token_paths.append(match.group(1))
39    return token_paths
40
41def __get_pkcs11_file_list(token_path):
42    """Return string with PKCS#11 file paths and their associated metadata."""
43    find_args = '-printf "\'%p\', \'%u:%g\', 0%m\n"'
44    file_list_output = __run_cmd('find %s ' % token_path + find_args)
45    return file_list_output
46
47def __get_token_slot_by_path(token_path):
48    token_list = __run_cmd('p11_replay --list_tokens')
49    for line in token_list.split('\n'):
50        match = re.search(r'^Slot (\d+): ' + token_path, line)
51        if not match:
52            continue
53        return match.group(1)
54    return INVALID_SLOT_ID
55
56def __verify_tokenname(token_path):
57    """Verify that the TPM token name is correct."""
58    # The token path is expected to be of the form:
59    # /run/daemon-store/chaps/<obfuscated_user_id>
60    match = re.search(r'/run/daemon-store/chaps/(.*)', token_path)
61    if not match:
62        return False
63    obfuscated_user = match.group(1)
64    # We expect the token label to contain first 16 characters of the obfuscated
65    # user id. This is the same value we extracted from |token_path|.
66    expected_user_token_label = USER_TOKEN_PREFIX + obfuscated_user[:16]
67    # The p11_replay tool will list tokens in the following form:
68    # Slot 1: <token label>
69    token_list = __run_cmd('p11_replay --list_tokens')
70    for line in token_list.split('\n'):
71        match = re.search(r'^Slot \d+: (.*)$', line)
72        if not match:
73            continue
74        token_label = match.group(1).rstrip()
75        if (token_label == expected_user_token_label):
76            return True
77        # Ignore the system token label.
78        if token_label == SYSTEM_TOKEN_NAME:
79            continue
80        logging.error('Unexpected token label: |%s|', token_label)
81    logging.error('Invalid or missing PKCS#11 token label!')
82    return False
83
84def __verify_permissions(token_path):
85    """Verify that the permissions on the initialized token dir are correct."""
86    # List of 3-tuples consisting of (path, user:group, octal permissions).
87    # Can be generated (for example), by:
88    # find <token_path>/chaps -printf "'%p', '%u:%g', 0%m\n"
89    expected_permissions = [
90        (token_path, 'chaps:chronos-access', CHAPS_DIR_PERM),
91        ('%s/database' % token_path, 'chaps:chronos-access', CHAPS_DIR_PERM)]
92    for item in expected_permissions:
93        path = item[0]
94        (user, group) = item[1].split(':')
95        perms = item[2]
96        stat_buf = os.lstat(path)
97        if not stat_buf:
98            logging.error('Could not stat %s while checking for permissions.',
99                          path)
100            return False
101        # Check ownership.
102        path_user = pwd.getpwuid(stat_buf.st_uid).pw_name
103        path_group = grp.getgrgid(stat_buf.st_gid).gr_name
104        if path_user != user or path_group != group:
105            logging.error('Ownership of %s does not match! Got = (%s, %s)'
106                          ', Expected = (%s, %s)', path, path_user, path_group,
107                          user, group)
108            return False
109
110        # Check permissions.
111        path_perms = stat.S_IMODE(stat_buf.st_mode)
112        if path_perms != perms:
113            logging.error('Permissions for %s do not match! (Got = %s'
114                          ', Expected = %s)', path, oct(path_perms), oct(perms))
115            return False
116
117    return True
118
119def verify_pkcs11_initialized():
120    """Checks if the PKCS#11 token is initialized properly."""
121    token_path_list = __get_token_paths(exclude_system_token=True)
122    if len(token_path_list) != 1:
123        logging.error('Expecting a single signed-in user with a token.')
124        return False
125
126    verify_cmd = ('cryptohome --action=pkcs11_is_user_token_ok')
127    __run_cmd(verify_cmd)
128
129    verify_result = True
130    # Do additional sanity tests.
131    if not __verify_tokenname(token_path_list[0]):
132        logging.error('Verification of token name failed!')
133        verify_result = False
134    if not __verify_permissions(token_path_list[0]):
135        logging.error('PKCS#11 file list:\n%s',
136                      __get_pkcs11_file_list(token_path_list[0]))
137        logging.error(
138            'Verification of PKCS#11 subsystem and token permissions failed!')
139        verify_result = False
140    return verify_result
141
142def load_p11_test_token(auth_data='1234'):
143    """Loads the test token onto a slot.
144
145    @param auth_data: The authorization data to use for the token.
146    """
147    utils.system('sudo chaps_client --load --path=%s --auth="%s"' %
148                 (TMP_CHAPS_DIR, auth_data))
149
150def change_p11_test_token_auth_data(auth_data, new_auth_data):
151    """Changes authorization data for the test token.
152
153    @param auth_data: The current authorization data.
154    @param new_auth_data: The new authorization data.
155    """
156    utils.system('sudo chaps_client --change_auth --path=%s --auth="%s" '
157                 '--new_auth="%s"' % (TMP_CHAPS_DIR, auth_data, new_auth_data))
158
159def unload_p11_test_token():
160    """Unloads a loaded test token."""
161    utils.system('sudo chaps_client --unload --path=%s' % TMP_CHAPS_DIR)
162
163def copytree_with_ownership(src, dst):
164    """Like shutil.copytree but also copies owner and group attributes.
165    @param src: Source directory.
166    @param dst: Destination directory.
167    """
168    utils.system('cp -rp %s %s' % (src, dst))
169
170def setup_p11_test_token(unload_user_tokens, auth_data='1234'):
171    """Configures a PKCS #11 token for testing.
172
173    Any existing test token will be automatically cleaned up.
174
175    @param unload_user_tokens: Whether to unload all user tokens.
176    @param auth_data: Initial token authorization data.
177    """
178    cleanup_p11_test_token()
179    if unload_user_tokens:
180        for path in __get_token_paths(exclude_system_token=False):
181            utils.system('sudo chaps_client --unload --path=%s' % path)
182    os.makedirs(TMP_CHAPS_DIR)
183    uid = pwd.getpwnam('chaps')[2]
184    gid = grp.getgrnam('chronos-access')[2]
185    os.chown(TMP_CHAPS_DIR, uid, gid)
186    os.chmod(TMP_CHAPS_DIR, CHAPS_DIR_PERM)
187    load_p11_test_token(auth_data)
188    unload_p11_test_token()
189    copytree_with_ownership(TMP_CHAPS_DIR, '%s_bak' % TMP_CHAPS_DIR)
190
191def restore_p11_test_token():
192    """Restores a PKCS #11 test token to its initial state."""
193    shutil.rmtree(TMP_CHAPS_DIR)
194    copytree_with_ownership('%s_bak' % TMP_CHAPS_DIR, TMP_CHAPS_DIR)
195
196def get_p11_test_token_db_path():
197    """Returns the test token database path."""
198    return '%s/database' % TMP_CHAPS_DIR
199
200def verify_p11_test_token():
201    """Verifies that a test token is working and persistent."""
202    output = __run_cmd('p11_replay --generate --replay_wifi',
203                       ignore_status=True)
204    if not re.search('Sign: CKR_OK', output):
205        print(output, file=sys.stderr)
206        return False
207    unload_p11_test_token()
208    load_p11_test_token()
209    output = __run_cmd('p11_replay --replay_wifi --cleanup',
210                       ignore_status=True)
211    if not re.search('Sign: CKR_OK', output):
212        print(output, file=sys.stderr)
213        return False
214    return True
215
216def cleanup_p11_test_token():
217    """Deletes the test token."""
218    unload_p11_test_token()
219    shutil.rmtree(TMP_CHAPS_DIR, ignore_errors=True)
220    shutil.rmtree('%s_bak' % TMP_CHAPS_DIR, ignore_errors=True)
221
222def wait_for_pkcs11_token():
223    """Waits for the PKCS #11 token to be available.
224
225    This should be called only after a login and is typically called immediately
226    after a login.
227
228    Returns:
229        True if the token is available.
230    """
231    try:
232        utils.poll_for_condition(
233            lambda: utils.system('cryptohome --action=pkcs11_is_user_token_ok',
234                                 ignore_status=True) == 0,
235            desc='PKCS #11 token.',
236            timeout=300)
237    except utils.TimeoutError:
238        return False
239    return True
240
241def __p11_replay_on_user_token(extra_args=''):
242    """Executes a typical command replay on the current user token.
243
244    Args:
245        extra_args: Additional arguments to pass to p11_replay.
246
247    Returns:
248        The command output.
249    """
250    if not wait_for_pkcs11_token():
251       raise error.TestError('Timeout while waiting for pkcs11 token')
252    return __run_cmd('p11_replay --slot=%s %s'
253                     % (__get_token_slot_by_path(USER_TOKEN_PREFIX),
254                        extra_args),
255                     ignore_status=True)
256
257def inject_and_test_key():
258    """Injects a key into a PKCS #11 token and tests that it can sign."""
259    output = __p11_replay_on_user_token('--replay_wifi --inject')
260    return re.search('Sign: CKR_OK', output)
261
262def test_and_cleanup_key():
263    """Tests a PKCS #11 key before deleting it."""
264    output = __p11_replay_on_user_token('--replay_wifi --cleanup')
265    return re.search('Sign: CKR_OK', output)
266
267def generate_user_key():
268    """Generates a key in the current user token."""
269    output = __p11_replay_on_user_token('--replay_wifi --generate')
270    return re.search('Sign: CKR_OK', output)
271
272