1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, gobject, logging, os, random, re, shutil, string 6from dbus.mainloop.glib import DBusGMainLoop 7 8import common, constants 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.cros.cros_disks import DBusClient 12 13CRYPTOHOME_CMD = '/usr/sbin/cryptohome' 14GUEST_USER_NAME = '$guest' 15UNAVAILABLE_ACTION = 'Unknown action or no action given.' 16 17class ChromiumOSError(error.TestError): 18 """Generic error for ChromiumOS-specific exceptions.""" 19 pass 20 21def __run_cmd(cmd): 22 return utils.system_output(cmd + ' 2>&1', retain_output=True, 23 ignore_status=True).strip() 24 25def get_user_hash(user): 26 """Get the user hash for the given user.""" 27 return utils.system_output(['cryptohome', '--action=obfuscate_user', 28 '--user=%s' % user]) 29 30 31def user_path(user): 32 """Get the user mount point for the given user.""" 33 return utils.system_output(['cryptohome-path', 'user', user]) 34 35 36def system_path(user): 37 """Get the system mount point for the given user.""" 38 return utils.system_output(['cryptohome-path', 'system', user]) 39 40 41def ensure_clean_cryptohome_for(user, password=None): 42 """Ensure a fresh cryptohome exists for user. 43 44 @param user: user who needs a shiny new cryptohome. 45 @param password: if unset, a random password will be used. 46 """ 47 if not password: 48 password = ''.join(random.sample(string.ascii_lowercase, 6)) 49 remove_vault(user) 50 mount_vault(user, password, create=True) 51 52 53def get_tpm_status(): 54 """Get the TPM status. 55 56 Returns: 57 A TPM status dictionary, for example: 58 { 'Enabled': True, 59 'Owned': True, 60 'Being Owned': False, 61 'Ready': True, 62 'Password': '' 63 } 64 """ 65 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status') 66 status = {} 67 for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']: 68 match = re.search('TPM %s: (true|false)' % field, out) 69 if not match: 70 raise ChromiumOSError('Invalid TPM status: "%s".' % out) 71 status[field] = match.group(1) == 'true' 72 match = re.search('TPM Password: (\w*)', out) 73 status['Password'] = '' 74 if match: 75 status['Password'] = match.group(1) 76 return status 77 78 79def get_tpm_more_status(): 80 """Get more of the TPM status. 81 82 Returns: 83 A TPM more status dictionary, for example: 84 { 'dictionary_attack_lockout_in_effect': False, 85 'attestation_prepared': False, 86 'boot_lockbox_finalized': False, 87 'enabled': True, 88 'owned': True, 89 'owner_password': '' 90 'dictionary_attack_counter': 0, 91 'dictionary_attack_lockout_seconds_remaining': 0, 92 'dictionary_attack_threshold': 10, 93 'attestation_enrolled': False, 94 'initialized': True, 95 'verified_boot_measured': False, 96 'install_lockbox_finalized': True 97 } 98 An empty dictionary is returned if the command is not supported. 99 """ 100 status = {} 101 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :') 102 if out.startswith(UNAVAILABLE_ACTION): 103 # --action=tpm_more_status only exists >= 41. 104 logging.info('Method not supported!') 105 return status 106 for line in out.splitlines(): 107 items = line.strip().split(':') 108 if items[1].strip() == 'false': 109 value = False 110 elif items[1].strip() == 'true': 111 value = True 112 elif items[1].strip().isdigit(): 113 value = int(items[1].strip()) 114 else: 115 value = items[1].strip(' "') 116 status[items[0]] = value 117 return status 118 119 120def is_tpm_lockout_in_effect(): 121 """Returns true if the TPM lockout is in effect; false otherwise.""" 122 status = get_tpm_more_status() 123 return status.get('dictionary_attack_lockout_in_effect', None) 124 125 126def get_login_status(): 127 """Query the login status 128 129 Returns: 130 A login status dictionary containing: 131 { 'owner_user_exists': True|False, 132 'boot_lockbox_finalized': True|False 133 } 134 """ 135 out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status') 136 status = {} 137 for field in ['owner_user_exists', 'boot_lockbox_finalized']: 138 match = re.search('%s: (true|false)' % field, out) 139 if not match: 140 raise ChromiumOSError('Invalid login status: "%s".' % out) 141 status[field] = match.group(1) == 'true' 142 return status 143 144 145def get_tpm_attestation_status(): 146 """Get the TPM attestation status. Works similar to get_tpm_status(). 147 """ 148 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status') 149 status = {} 150 for field in ['Prepared', 'Enrolled']: 151 match = re.search('Attestation %s: (true|false)' % field, out) 152 if not match: 153 raise ChromiumOSError('Invalid attestation status: "%s".' % out) 154 status[field] = match.group(1) == 'true' 155 return status 156 157 158def take_tpm_ownership(): 159 """Take TPM owernship. 160 161 Blocks until TPM is owned. 162 """ 163 __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership') 164 __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership') 165 166 167def verify_ek(): 168 """Verify the TPM endorsement key. 169 170 Returns true if EK is valid. 171 """ 172 cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek' 173 return (utils.system(cmd, ignore_status=True) == 0) 174 175 176def remove_vault(user): 177 """Remove the given user's vault from the shadow directory.""" 178 logging.debug('user is %s', user) 179 user_hash = get_user_hash(user) 180 logging.debug('Removing vault for user %s with hash %s' % (user, user_hash)) 181 cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user 182 __run_cmd(cmd) 183 # Ensure that the vault does not exist. 184 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 185 raise ChromiumOSError('Cryptohome could not remove the user\'s vault.') 186 187 188def remove_all_vaults(): 189 """Remove any existing vaults from the shadow directory. 190 191 This function must be run with root privileges. 192 """ 193 for item in os.listdir(constants.SHADOW_ROOT): 194 abs_item = os.path.join(constants.SHADOW_ROOT, item) 195 if os.path.isdir(os.path.join(abs_item, 'vault')): 196 logging.debug('Removing vault for user with hash %s' % item) 197 shutil.rmtree(abs_item) 198 199 200def mount_vault(user, password, create=False): 201 """Mount the given user's vault.""" 202 args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user, 203 '--password=%s' % password, '--async'] 204 if create: 205 args.append('--create') 206 logging.info(__run_cmd(' '.join(args))) 207 # Ensure that the vault exists in the shadow directory. 208 user_hash = get_user_hash(user) 209 if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 210 raise ChromiumOSError('Cryptohome vault not found after mount.') 211 # Ensure that the vault is mounted. 212 if not is_vault_mounted( 213 user=user, 214 device_regex=constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER, 215 allow_fail=True): 216 raise ChromiumOSError('Cryptohome created a vault but did not mount.') 217 218 219def mount_guest(): 220 """Mount the given user's vault.""" 221 args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async'] 222 logging.info(__run_cmd(' '.join(args))) 223 # Ensure that the guest tmpfs is mounted. 224 if not is_guest_vault_mounted(allow_fail=True): 225 raise ChromiumOSError('Cryptohome did not mount tmpfs.') 226 227 228def test_auth(user, password): 229 cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user, 230 '--password=%s' % password, '--async'] 231 return 'Authentication succeeded' in utils.system_output(cmd) 232 233 234def unmount_vault(user): 235 """Unmount the given user's vault. 236 237 Once unmounting for a specific user is supported, the user parameter will 238 name the target user. See crosbug.com/20778. 239 """ 240 __run_cmd(CRYPTOHOME_CMD + ' --action=unmount') 241 # Ensure that the vault is not mounted. 242 if is_vault_mounted(user, allow_fail=True): 243 raise ChromiumOSError('Cryptohome did not unmount the user.') 244 245 246def __get_mount_info(mount_point, allow_fail=False): 247 """Get information about the active mount at a given mount point.""" 248 cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts' 249 try: 250 logging.info(utils.system_output('cat %s' % cryptohomed_path)) 251 mount_line = utils.system_output( 252 'grep %s %s' % (mount_point, cryptohomed_path), 253 ignore_status=allow_fail) 254 except Exception as e: 255 logging.error(e) 256 raise ChromiumOSError('Could not get info about cryptohome vault ' 257 'through %s. See logs for complete mount-point.' 258 % os.path.dirname(str(mount_point))) 259 return mount_line.split() 260 261 262def __get_user_mount_info(user, allow_fail=False): 263 """Get information about the active mounts for a given user. 264 265 Returns the active mounts at the user's user and system mount points. If no 266 user is given, the active mount at the shared mount point is returned 267 (regular users have a bind-mount at this mount point for backwards 268 compatibility; the guest user has a mount at this mount point only). 269 """ 270 return [__get_mount_info(mount_point=user_path(user), 271 allow_fail=allow_fail), 272 __get_mount_info(mount_point=system_path(user), 273 allow_fail=allow_fail)] 274 275def is_vault_mounted( 276 user, 277 device_regex=constants.CRYPTOHOME_DEV_REGEX_ANY, 278 fs_regex=constants.CRYPTOHOME_FS_REGEX_ANY, 279 allow_fail=False): 280 """Check whether a vault is mounted for the given user. 281 282 If no user is given, the shared mount point is checked, determining whether 283 a vault is mounted for any user. 284 """ 285 user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail) 286 for mount_info in user_mount_info: 287 if (len(mount_info) < 3 or 288 not re.match(device_regex, mount_info[0]) or 289 not re.match(fs_regex, mount_info[2])): 290 return False 291 return True 292 293 294def is_guest_vault_mounted(allow_fail=False): 295 """Check whether a vault backed by tmpfs is mounted for the guest user.""" 296 return is_vault_mounted( 297 user=GUEST_USER_NAME, 298 device_regex=constants.CRYPTOHOME_DEV_REGEX_GUEST, 299 fs_regex=constants.CRYPTOHOME_FS_REGEX_TMPFS, 300 allow_fail=allow_fail) 301 302 303def get_mounted_vault_devices(user, allow_fail=False): 304 """Get the device(s) backing the vault mounted for the given user. 305 306 Returns the devices mounted at the user's user and system mount points. If 307 no user is given, the device mounted at the shared mount point is returned. 308 """ 309 return [mount_info[0] 310 for mount_info 311 in __get_user_mount_info(user=user, allow_fail=allow_fail) 312 if len(mount_info)] 313 314 315def canonicalize(credential): 316 """Perform basic canonicalization of |email_address|. 317 318 Perform basic canonicalization of |email_address|, taking into account that 319 gmail does not consider '.' or caps inside a username to matter. It also 320 ignores everything after a '+'. For example, 321 c.masone+abc@gmail.com == cMaSone@gmail.com, per 322 http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313 323 """ 324 if not credential: 325 return None 326 327 parts = credential.split('@') 328 if len(parts) != 2: 329 raise error.TestError('Malformed email: ' + credential) 330 331 (name, domain) = parts 332 name = name.partition('+')[0] 333 if (domain == constants.SPECIAL_CASE_DOMAIN): 334 name = name.replace('.', '') 335 return '@'.join([name, domain]).lower() 336 337 338def crash_cryptohomed(): 339 # Try to kill cryptohomed so we get something to work with. 340 pid = __run_cmd('pgrep cryptohomed') 341 try: 342 pid = int(pid) 343 except ValueError, e: # empty or invalid string 344 raise error.TestError('Cryptohomed was not running') 345 utils.system('kill -ABRT %d' % pid) 346 # CONT just in case cryptohomed had a spurious STOP. 347 utils.system('kill -CONT %d' % pid) 348 utils.poll_for_condition( 349 lambda: utils.system('ps -p %d' % pid, 350 ignore_status=True) != 0, 351 timeout=180, 352 exception=error.TestError( 353 'Timeout waiting for cryptohomed to coredump')) 354 355 356class CryptohomeProxy(DBusClient): 357 """A DBus proxy client for testing the Cryptohome DBus server. 358 """ 359 CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome' 360 CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome' 361 CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface' 362 ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus' 363 ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = ( 364 'async_id', 'return_status', 'return_code' 365 ) 366 DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties' 367 368 369 def __init__(self, bus_loop=None): 370 self.main_loop = gobject.MainLoop() 371 if bus_loop is None: 372 bus_loop = DBusGMainLoop(set_as_default=True) 373 self.bus = dbus.SystemBus(mainloop=bus_loop) 374 super(CryptohomeProxy, self).__init__(self.main_loop, self.bus, 375 self.CRYPTOHOME_BUS_NAME, 376 self.CRYPTOHOME_OBJECT_PATH) 377 self.iface = dbus.Interface(self.proxy_object, 378 self.CRYPTOHOME_INTERFACE) 379 self.properties = dbus.Interface(self.proxy_object, 380 self.DBUS_PROPERTIES_INTERFACE) 381 self.handle_signal(self.CRYPTOHOME_INTERFACE, 382 self.ASYNC_CALL_STATUS_SIGNAL, 383 self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS) 384 385 386 # Wrap all proxied calls to catch cryptohomed failures. 387 def __call(self, method, *args): 388 try: 389 return method(*args, timeout=180) 390 except dbus.exceptions.DBusException, e: 391 if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply': 392 logging.error('Cryptohome is not responding. Sending ABRT') 393 crash_cryptohomed() 394 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 395 raise e 396 397 398 def __wait_for_specific_signal(self, signal, data): 399 """Wait for the |signal| with matching |data| 400 Returns the resulting dict on success or {} on error. 401 """ 402 # Do not bubble up the timeout here, just return {}. 403 result = {} 404 try: 405 result = self.wait_for_signal(signal) 406 except utils.TimeoutError: 407 return {} 408 for k in data.keys(): 409 if not result.has_key(k) or result[k] != data[k]: 410 return {} 411 return result 412 413 414 # Perform a data-less async call. 415 # TODO(wad) Add __async_data_call. 416 def __async_call(self, method, *args): 417 # Clear out any superfluous async call signals. 418 self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL) 419 out = self.__call(method, *args) 420 logging.debug('Issued call ' + str(method) + 421 ' with async_id ' + str(out)) 422 result = {} 423 try: 424 # __wait_for_specific_signal has a 10s timeout 425 result = utils.poll_for_condition( 426 lambda: self.__wait_for_specific_signal( 427 self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}), 428 timeout=180, 429 desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL) 430 except utils.TimeoutError, e: 431 logging.error('Cryptohome timed out. Sending ABRT.') 432 crash_cryptohomed() 433 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 434 return result 435 436 437 def mount(self, user, password, create=False, async=True): 438 """Mounts a cryptohome. 439 440 Returns True if the mount succeeds or False otherwise. 441 TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe 442 heuristic, then remove this method. See <crosbug.com/20778>. 443 """ 444 if async: 445 return self.__async_call(self.iface.AsyncMount, user, password, 446 create, False, [])['return_status'] 447 out = self.__call(self.iface.Mount, user, password, create, False, []) 448 # Sync returns (return code, return status) 449 return out[1] if len(out) > 1 else False 450 451 452 def unmount(self, user): 453 """Unmounts a cryptohome. 454 455 Returns True if the unmount suceeds or false otherwise. 456 TODO(ellyjones): Once there's a per-user unmount method, use it. See 457 <crosbug.com/20778>. 458 """ 459 return self.__call(self.iface.Unmount) 460 461 462 def is_mounted(self, user): 463 """Tests whether a user's cryptohome is mounted.""" 464 return (utils.is_mountpoint(user_path(user)) 465 and utils.is_mountpoint(system_path(user))) 466 467 468 def require_mounted(self, user): 469 """Raises a test failure if a user's cryptohome is not mounted.""" 470 utils.require_mountpoint(user_path(user)) 471 utils.require_mountpoint(system_path(user)) 472 473 474 def migrate(self, user, oldkey, newkey, async=True): 475 """Migrates the specified user's cryptohome from one key to another.""" 476 if async: 477 return self.__async_call(self.iface.AsyncMigrateKey, 478 user, oldkey, newkey)['return_status'] 479 return self.__call(self.iface.MigrateKey, user, oldkey, newkey) 480 481 482 def remove(self, user, async=True): 483 if async: 484 return self.__async_call(self.iface.AsyncRemove, 485 user)['return_status'] 486 return self.__call(self.iface.Remove, user) 487 488 489 def ensure_clean_cryptohome_for(self, user, password=None): 490 """Ensure a fresh cryptohome exists for user. 491 492 @param user: user who needs a shiny new cryptohome. 493 @param password: if unset, a random password will be used. 494 """ 495 if not password: 496 password = ''.join(random.sample(string.ascii_lowercase, 6)) 497 self.remove(user) 498 self.mount(user, password, create=True) 499