1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, shutil, tempfile
6
7import common, constants, cryptohome
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import autotemp, error
10from autotest_lib.client.cros import cros_ui
11
12
13PK12UTIL = 'pk12util'
14CERTUTIL = 'certutil'
15OPENSSLP12 = 'openssl pkcs12'
16OPENSSLX509 = 'openssl x509'
17OPENSSLRSA = 'openssl rsa'
18OPENSSLREQ = 'openssl req'
19OPENSSLCRYPTO = 'openssl sha1'
20
21TESTUSER = 'ownership_test@chromium.org'
22TESTPASS = 'testme'
23
24
25class OwnershipError(error.TestError):
26    """Generic error for ownership-related failures."""
27    pass
28
29
30class scoped_tempfile(object):
31    """A wrapper that provides scoped semantics for temporary files.
32
33    Providing a file path causes the scoped_tempfile to take ownership of the
34    file at the provided path.  The file at the path will be deleted when this
35    object goes out of scope.  If no path is provided, then a temporary file
36    object will be created for the lifetime of the scoped_tempfile
37
38    autotemp.tempfile objects don't seem to play nicely with being
39    used in system commands, so they can't be used for my purposes.
40    """
41
42    tempdir = autotemp.tempdir(unique_id='ownership')
43
44    def __init__(self, name=None):
45        self.name = name
46        if not self.name:
47            self.fo = tempfile.TemporaryFile()
48
49
50    def __del__(self):
51        if self.name:
52            if os.path.exists(self.name):
53                os.unlink(self.name)
54        else:
55            self.fo.close()  # Will destroy the underlying tempfile
56
57
58def system_output_on_fail(cmd):
59    """Run a |cmd|, capturing output and logging it only on error.
60
61    @param cmd: the command to run.
62    """
63    output = None
64    try:
65        output = utils.system_output(cmd)
66    except:
67        logging.error(output)
68        raise
69
70
71def __unlink(filename):
72    """unlink a file, but log OSError and IOError instead of raising.
73
74    This allows unlinking files that don't exist safely.
75
76    @param filename: the file to attempt to unlink.
77    """
78    try:
79        os.unlink(filename)
80    except (IOError, OSError) as error:
81        logging.info(error)
82
83
84def restart_ui_to_clear_ownership_files():
85    """Remove on-disk state related to device ownership.
86
87    The UI must be stopped while we do this, or the session_manager will
88    write the policy and key files out again.
89    """
90    cros_ui.stop(allow_fail=not cros_ui.is_up())
91    clear_ownership_files_no_restart()
92    cros_ui.start()
93
94
95def clear_ownership_files_no_restart():
96    """Remove on-disk state related to device ownership.
97
98    The UI must be stopped while we do this, or the session_manager will
99    write the policy and key files out again.
100    """
101    if cros_ui.is_up():
102        raise error.TestError("Tried to clear ownership with UI running.")
103    __unlink(constants.OWNER_KEY_FILE)
104    __unlink(constants.SIGNED_POLICY_FILE)
105    __unlink(os.path.join(constants.USER_DATA_DIR, 'Local State'))
106
107
108def fake_ownership():
109    """Fake ownership by generating the necessary magic files."""
110    # Determine the module directory.
111    dirname = os.path.dirname(__file__)
112    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
113    mock_signedpolicyfile = os.path.join(dirname,
114                                         constants.MOCK_OWNER_POLICY)
115    utils.open_write_close(constants.OWNER_KEY_FILE,
116                           cert_extract_pubkey_der(mock_certfile))
117    shutil.copy(mock_signedpolicyfile,
118                constants.SIGNED_POLICY_FILE)
119
120
121POLICY_TYPE = 'google/chromeos/device'
122
123
124def assert_has_policy_data(response_proto):
125    """Assert that given protobuf has a policy_data field.
126
127    @param response_proto: a PolicyFetchResponse protobuf.
128    @raises OwnershipError on failure.
129    """
130    if not response_proto.HasField("policy_data"):
131        raise OwnershipError('Malformatted response.')
132
133
134def assert_has_device_settings(data_proto):
135    """Assert that given protobuf is a policy with device settings in it.
136
137    @param data_proto: a PolicyData protobuf.
138    @raises OwnershipError if this isn't CrOS policy, or has no settings inside.
139    """
140    if (not data_proto.HasField("policy_type") or
141        data_proto.policy_type != POLICY_TYPE or
142        not data_proto.HasField("policy_value")):
143        raise OwnershipError('Malformatted response.')
144
145
146def assert_username(data_proto, username):
147    """Assert that given protobuf is a policy associated with the given user.
148
149    @param data_proto: a PolicyData protobuf.
150    @param username: the username to check for
151    @raises OwnershipError if data_proto isn't associated with username
152    """
153    if data_proto.username != username:
154        raise OwnershipError('Incorrect username.')
155
156
157def assert_guest_setting(settings, guests):
158    """Assert that given protobuf has given guest-related settings.
159
160    @param settings: a ChromeDeviceSettingsProto protobuf.
161    @param guests: boolean indicating whether guests are allowed to sign in.
162    @raises OwnershipError if settings doesn't enforce the provided setting.
163    """
164    if not settings.HasField("guest_mode_enabled"):
165        raise OwnershipError('No guest mode setting protobuf.')
166    if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
167        raise OwnershipError('No guest mode setting.')
168    if settings.guest_mode_enabled.guest_mode_enabled != guests:
169        raise OwnershipError('Incorrect guest mode setting.')
170
171
172def assert_show_users(settings, show_users):
173    """Assert that given protobuf has given user-avatar-showing settings.
174
175    @param settings: a ChromeDeviceSettingsProto protobuf.
176    @param show_users: boolean indicating whether avatars are shown on sign in.
177    @raises OwnershipError if settings doesn't enforce the provided setting.
178    """
179    if not settings.HasField("show_user_names"):
180        raise OwnershipError('No show users setting protobuf.')
181    if not settings.show_user_names.HasField("show_user_names"):
182        raise OwnershipError('No show users setting.')
183    if settings.show_user_names.show_user_names != show_users:
184        raise OwnershipError('Incorrect show users setting.')
185
186
187def assert_roaming(settings, roaming):
188    """Assert that given protobuf has given roaming settings.
189
190    @param settings: a ChromeDeviceSettingsProto protobuf.
191    @param roaming: boolean indicating whether roaming is allowed.
192    @raises OwnershipError if settings doesn't enforce the provided setting.
193    """
194    if not settings.HasField("data_roaming_enabled"):
195        raise OwnershipError('No roaming setting protobuf.')
196    if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
197        raise OwnershipError('No roaming setting.')
198    if settings.data_roaming_enabled.data_roaming_enabled != roaming:
199        raise OwnershipError('Incorrect roaming setting.')
200
201
202def assert_new_users(settings, new_users):
203    """Assert that given protobuf has given new user settings.
204
205    @param settings: a ChromeDeviceSettingsProto protobuf.
206    @param new_users: boolean indicating whether adding users is allowed.
207    @raises OwnershipError if settings doesn't enforce the provided setting.
208    """
209    if not settings.HasField("allow_new_users"):
210        raise OwnershipError('No allow new users setting protobuf.')
211    if not settings.allow_new_users.HasField("allow_new_users"):
212        raise OwnershipError('No allow new users setting.')
213    if settings.allow_new_users.allow_new_users != new_users:
214        raise OwnershipError('Incorrect allow new users setting.')
215
216
217def assert_users_on_whitelist(settings, users):
218    """Assert that given protobuf has given users on the whitelist.
219
220    @param settings: a ChromeDeviceSettingsProto protobuf.
221    @param users: iterable containing usernames that should be on whitelist.
222    @raises OwnershipError if settings doesn't enforce the provided setting.
223    """
224    if settings.HasField("user_whitelist"):
225        for user in users:
226            if user not in settings.user_whitelist.user_whitelist:
227                raise OwnershipError(user + ' not whitelisted.')
228    else:
229        raise OwnershipError('No user whitelist.')
230
231
232def assert_proxy_settings(settings, proxies):
233    """Assert that given protobuf has given proxy settings.
234
235    @param settings: a ChromeDeviceSettingsProto protobuf.
236    @param proxies: dict { 'proxy_mode': <mode string> }
237    @raises OwnershipError if settings doesn't enforce the provided setting.
238    """
239    if not settings.HasField("device_proxy_settings"):
240        raise OwnershipError('No proxy settings protobuf.')
241    if not settings.device_proxy_settings.HasField("proxy_mode"):
242        raise OwnershipError('No proxy_mode setting.')
243    if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']:
244        raise OwnershipError('Incorrect proxies: %s' % proxies)
245
246
247def __user_nssdb(user):
248    """Returns the path to the NSSDB for the provided user.
249
250    @param user: the user whose NSSDB the caller wants.
251    @return: absolute path to user's NSSDB.
252    """
253    return os.path.join(cryptohome.user_path(user), '.pki', 'nssdb')
254
255
256def use_known_ownerkeys(user):
257    """Sets the system up to use a well-known keypair for owner operations.
258
259    Assuming the appropriate cryptohome is already mounted, configures the
260    device to accept policies signed with the checked-in 'mock' owner key.
261
262    @param user: the user whose NSSDB should be populated with key material.
263    """
264    dirname = os.path.dirname(__file__)
265    mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
266    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
267    push_to_nss(mock_keyfile, mock_certfile, __user_nssdb(user))
268    utils.open_write_close(constants.OWNER_KEY_FILE,
269                           cert_extract_pubkey_der(mock_certfile))
270
271
272def known_privkey():
273    """Returns the mock owner private key in PEM format.
274
275    @return: mock owner private key in PEM format.
276    """
277    dirname = os.path.dirname(__file__)
278    return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
279
280
281def known_pubkey():
282    """Returns the mock owner public key in DER format.
283
284    @return: mock owner public key in DER format.
285    """
286    dirname = os.path.dirname(__file__)
287    return cert_extract_pubkey_der(os.path.join(dirname,
288                                                constants.MOCK_OWNER_CERT))
289
290
291def pairgen():
292    """Generate a self-signed cert and associated private key.
293
294    Generates a self-signed X509 certificate and the associated private key.
295    The key is 2048 bits.  The generated material is stored in PEM format
296    and the paths to the two files are returned.
297
298    The caller is responsible for cleaning up these files.
299
300    @return: (/path/to/private_key, /path/to/self-signed_cert)
301    """
302    keyfile = scoped_tempfile.tempdir.name + '/private.key'
303    certfile = scoped_tempfile.tempdir.name + '/cert.pem'
304    cmd = '%s -x509 -subj %s -newkey rsa:2048 -nodes -keyout %s -out %s' % (
305        OPENSSLREQ, '/CN=me', keyfile, certfile)
306    system_output_on_fail(cmd)
307    return (keyfile, certfile)
308
309
310def pairgen_as_data():
311    """Generates keypair, returns keys as data.
312
313    Generates a fresh owner keypair and then passes back the
314    PEM-encoded private key and the DER-encoded public key.
315
316    @return: (PEM-encoded private key, DER-encoded public key)
317    """
318    (keypath, certpath) = pairgen()
319    keyfile = scoped_tempfile(keypath)
320    certfile = scoped_tempfile(certpath)
321    return (utils.read_file(keyfile.name),
322            cert_extract_pubkey_der(certfile.name))
323
324
325def push_to_nss(keyfile, certfile, nssdb):
326    """Takes a pre-generated key pair and pushes them to an NSS DB.
327
328    Given paths to a private key and cert in PEM format, stores the pair
329    in the provided nssdb.
330
331    @param keyfile: path to PEM-formatted private key file.
332    @param certfile: path to PEM-formatted cert file for associated public key.
333    @param nssdb: path to NSSDB to be populated with the provided keys.
334    """
335    for_push = scoped_tempfile(scoped_tempfile.tempdir.name + '/for_push.p12')
336    cmd = '%s -export -in %s -inkey %s -out %s ' % (
337        OPENSSLP12, certfile, keyfile, for_push.name)
338    cmd += '-passin pass: -passout pass:'
339    system_output_on_fail(cmd)
340    cmd = '%s -d "sql:%s" -i %s -W ""' % (PK12UTIL,
341                                          nssdb,
342                                          for_push.name)
343    system_output_on_fail(cmd)
344
345
346def cert_extract_pubkey_der(pem):
347    """Given a PEM-formatted cert, extracts the public key in DER format.
348
349    Pass in an X509 certificate in PEM format, and you'll get back the
350    DER-formatted public key as a string.
351
352    @param pem: path to a PEM-formatted cert file.
353    @return: DER-encoded public key from cert, as a string.
354    """
355    outfile = scoped_tempfile(scoped_tempfile.tempdir.name + '/pubkey.der')
356    cmd = '%s -in %s -pubkey -noout ' % (OPENSSLX509, pem)
357    cmd += '| %s -outform DER -pubin -out %s' % (OPENSSLRSA,
358                                                 outfile.name)
359    system_output_on_fail(cmd)
360    der = utils.read_file(outfile.name)
361    return der
362
363
364def sign(pem_key, data):
365    """Signs |data| with key from |pem_key|, returns signature.
366
367    Using the PEM-formatted private key in |pem_key|, generates an
368    RSA-with-SHA1 signature over |data| and returns the signature in
369    a string.
370
371    @param pem_key: PEM-formatted private key, as a string.
372    @param data: data to be signed.
373    @return: signature as a string.
374    """
375    sig = scoped_tempfile()
376    err = scoped_tempfile()
377    data_file = scoped_tempfile()
378    data_file.fo.write(data)
379    data_file.fo.seek(0)
380
381    pem_key_file = scoped_tempfile(scoped_tempfile.tempdir.name + '/pkey.pem')
382    utils.open_write_close(pem_key_file.name, pem_key)
383
384    cmd = '%s -sign %s' % (OPENSSLCRYPTO, pem_key_file.name)
385    try:
386        utils.run(cmd,
387                  stdin=data_file.fo,
388                  stdout_tee=sig.fo,
389                  stderr_tee=err.fo)
390    except:
391        err.fo.seek(0)
392        logging.error(err.fo.read())
393        raise
394
395    sig.fo.seek(0)
396    sig_data = sig.fo.read()
397    if not sig_data:
398        raise error.OwnershipError('Empty signature!')
399    return sig_data
400
401
402def get_user_policy_key_filename(username):
403    """Returns the path to the user policy key for the given username.
404
405    @param username: the user whose policy key we want the path to.
406    @return: absolute path to user's policy key file.
407    """
408    return os.path.join(constants.USER_POLICY_DIR,
409                        cryptohome.get_user_hash(username),
410                        constants.USER_POLICY_KEY_FILENAME)
411