1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, grp, os, pwd, stat 6from dbus.mainloop.glib import DBusGMainLoop 7 8from autotest_lib.client.bin import test 9from autotest_lib.client.common_lib import error 10from autotest_lib.client.common_lib.cros import policy, session_manager 11from autotest_lib.client.cros import cros_ui, cryptohome, ownership 12 13 14class login_UserPolicyKeys(test.test): 15 """Verifies that, after user policy is pushed, the user policy key winds 16 up stored in the right place. 17 """ 18 version = 1 19 20 def _can_read(self, uid, gid, info): 21 """Returns true if uid or gid can read a file with the info stat.""" 22 if uid == info.st_uid: 23 return info.st_mode & stat.S_IRUSR 24 if gid == info.st_gid: 25 return info.st_mode & stat.S_IRGRP 26 return info.st_mode & stat.S_IROTH 27 28 29 def _can_execute(self, uid, gid, info): 30 """Returns true if uid or gid can execute a file with the info stat.""" 31 if uid == info.st_uid: 32 return info.st_mode & stat.S_IXUSR 33 if gid == info.st_gid: 34 return info.st_mode & stat.S_IXGRP 35 return info.st_mode & stat.S_IXOTH 36 37 38 def _verify_key_file(self, key_file): 39 """Verifies that the key file has been created and is readable.""" 40 if not os.path.isfile(key_file): 41 raise error.TestFail('%s does not exist!' % key_file) 42 # And is readable by chronos. 43 chronos_uid = pwd.getpwnam('chronos').pw_uid 44 chronos_gid = grp.getgrnam('chronos').gr_gid 45 info = os.stat(key_file) 46 if not stat.S_ISREG(info.st_mode): 47 raise error.TestFail('%s is not a regular file' % key_file) 48 if not self._can_read(chronos_uid, chronos_gid, info): 49 raise error.TestFail('chronos can\' read %s, mode is %s' % 50 (key_file, oct(info.st_mode))) 51 # All the parent directories must be executable by chronos. 52 current = key_file 53 parent = os.path.dirname(current) 54 while current != parent: 55 current = parent 56 parent = os.path.dirname(parent) 57 info = os.stat(current) 58 mode = stat.S_IMODE(info.st_mode) 59 if not self._can_execute(chronos_uid, chronos_gid, info): 60 raise error.TestFail('chronos can\'t execute %s, mode is %s' % 61 (current, oct(info.st_mode))) 62 63 64 def initialize(self): 65 super(login_UserPolicyKeys, self).initialize() 66 policy.install_protobufs(self.autodir, self.job) 67 self._bus_loop = DBusGMainLoop(set_as_default=True) 68 self._cryptohome_proxy = cryptohome.CryptohomeProxy( 69 self._bus_loop, self.autodir, self.job) 70 71 # Clear the user's vault, to make sure the test starts without any 72 # policy or key lingering around. At this stage the session isn't 73 # started and there's no user signed in. 74 ownership.restart_ui_to_clear_ownership_files() 75 self._cryptohome_proxy.remove(ownership.TESTUSER) 76 77 78 def run_once(self): 79 # Mount the vault, connect to session_manager and start the session. 80 self._cryptohome_proxy.mount(ownership.TESTUSER, 81 ownership.TESTPASS, 82 create=True) 83 sm = session_manager.connect(self._bus_loop) 84 sm.StartSession(ownership.TESTUSER, '') 85 86 # No policy stored yet. 87 retrieved_policy = sm.RetrievePolicyEx( 88 session_manager.make_user_policy_descriptor(ownership.TESTUSER), 89 byte_arrays=True) 90 if retrieved_policy: 91 raise error.TestError('session_manager already has user policy!') 92 93 # And no user key exists. 94 key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER) 95 if os.path.exists(key_file): 96 raise error.TestFail('%s exists before storing user policy!' % 97 key_file) 98 99 # Now store a policy. This is building a device policy protobuf, but 100 # that's fine as far as the session_manager is concerned; it's the 101 # outer PolicyFetchResponse that contains the public_key. 102 public_key = ownership.known_pubkey() 103 private_key = ownership.known_privkey() 104 policy_data = policy.build_policy_data() 105 policy_response = policy.generate_policy(private_key, 106 public_key, 107 policy_data) 108 try: 109 sm.StorePolicyEx( 110 session_manager.make_user_policy_descriptor(ownership.TESTUSER), 111 dbus.ByteArray(policy_response)) 112 except dbus.exceptions.DBusException as e: 113 raise error.TestFail('Failed to store user policy', e) 114 115 # The policy key should have been created now. 116 self._verify_key_file(key_file) 117 118 # Restart the ui; the key should be deleted. 119 self._cryptohome_proxy.unmount(ownership.TESTUSER) 120 cros_ui.restart() 121 if os.path.exists(key_file): 122 raise error.TestFail('%s exists after restarting ui!' % 123 key_file) 124 125 # Starting a new session will restore the key that was previously 126 # stored. Reconnect to the session_manager, since the restart killed it. 127 self._cryptohome_proxy.mount(ownership.TESTUSER, 128 ownership.TESTPASS, 129 create=True) 130 sm = session_manager.connect(self._bus_loop) 131 sm.StartSession(ownership.TESTUSER, '') 132 self._verify_key_file(key_file) 133 134 135 def cleanup(self): 136 cros_ui.restart() 137 self._cryptohome_proxy.remove(ownership.TESTUSER) 138 super(login_UserPolicyKeys, self).cleanup() 139