1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, gobject, logging, os, random, re, shutil, string, time 6from dbus.mainloop.glib import DBusGMainLoop 7 8import common, constants 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.cros.cros_disks import DBusClient 12 13CRYPTOHOME_CMD = '/usr/sbin/cryptohome' 14GUEST_USER_NAME = '$guest' 15UNAVAILABLE_ACTION = 'Unknown action or no action given.' 16MOUNT_RETRY_COUNT = 20 17 18class ChromiumOSError(error.TestError): 19 """Generic error for ChromiumOS-specific exceptions.""" 20 pass 21 22def __run_cmd(cmd): 23 return utils.system_output(cmd + ' 2>&1', retain_output=True, 24 ignore_status=True).strip() 25 26def get_user_hash(user): 27 """Get the user hash for the given user.""" 28 return utils.system_output(['cryptohome', '--action=obfuscate_user', 29 '--user=%s' % user]) 30 31 32def user_path(user): 33 """Get the user mount point for the given user.""" 34 return utils.system_output(['cryptohome-path', 'user', user]) 35 36 37def system_path(user): 38 """Get the system mount point for the given user.""" 39 return utils.system_output(['cryptohome-path', 'system', user]) 40 41 42def ensure_clean_cryptohome_for(user, password=None): 43 """Ensure a fresh cryptohome exists for user. 44 45 @param user: user who needs a shiny new cryptohome. 46 @param password: if unset, a random password will be used. 47 """ 48 if not password: 49 password = ''.join(random.sample(string.ascii_lowercase, 6)) 50 remove_vault(user) 51 mount_vault(user, password, create=True) 52 53 54def get_tpm_status(): 55 """Get the TPM status. 56 57 Returns: 58 A TPM status dictionary, for example: 59 { 'Enabled': True, 60 'Owned': True, 61 'Being Owned': False, 62 'Ready': True, 63 'Password': '' 64 } 65 """ 66 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status') 67 status = {} 68 for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']: 69 match = re.search('TPM %s: (true|false)' % field, out) 70 if not match: 71 raise ChromiumOSError('Invalid TPM status: "%s".' % out) 72 status[field] = match.group(1) == 'true' 73 match = re.search('TPM Password: (\w*)', out) 74 status['Password'] = '' 75 if match: 76 status['Password'] = match.group(1) 77 return status 78 79 80def get_tpm_more_status(): 81 """Get more of the TPM status. 82 83 Returns: 84 A TPM more status dictionary, for example: 85 { 'dictionary_attack_lockout_in_effect': False, 86 'attestation_prepared': False, 87 'boot_lockbox_finalized': False, 88 'enabled': True, 89 'owned': True, 90 'owner_password': '' 91 'dictionary_attack_counter': 0, 92 'dictionary_attack_lockout_seconds_remaining': 0, 93 'dictionary_attack_threshold': 10, 94 'attestation_enrolled': False, 95 'initialized': True, 96 'verified_boot_measured': False, 97 'install_lockbox_finalized': True 98 } 99 An empty dictionary is returned if the command is not supported. 100 """ 101 status = {} 102 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :') 103 if out.startswith(UNAVAILABLE_ACTION): 104 # --action=tpm_more_status only exists >= 41. 105 logging.info('Method not supported!') 106 return status 107 for line in out.splitlines(): 108 items = line.strip().split(':') 109 if items[1].strip() == 'false': 110 value = False 111 elif items[1].strip() == 'true': 112 value = True 113 elif items[1].strip().isdigit(): 114 value = int(items[1].strip()) 115 else: 116 value = items[1].strip(' "') 117 status[items[0]] = value 118 return status 119 120 121def is_tpm_lockout_in_effect(): 122 """Returns true if the TPM lockout is in effect; false otherwise.""" 123 status = get_tpm_more_status() 124 return status.get('dictionary_attack_lockout_in_effect', None) 125 126 127def get_login_status(): 128 """Query the login status 129 130 Returns: 131 A login status dictionary containing: 132 { 'owner_user_exists': True|False, 133 'boot_lockbox_finalized': True|False 134 } 135 """ 136 out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status') 137 status = {} 138 for field in ['owner_user_exists', 'boot_lockbox_finalized']: 139 match = re.search('%s: (true|false)' % field, out) 140 if not match: 141 raise ChromiumOSError('Invalid login status: "%s".' % out) 142 status[field] = match.group(1) == 'true' 143 return status 144 145 146def get_tpm_attestation_status(): 147 """Get the TPM attestation status. Works similar to get_tpm_status(). 148 """ 149 out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status') 150 status = {} 151 for field in ['Prepared', 'Enrolled']: 152 match = re.search('Attestation %s: (true|false)' % field, out) 153 if not match: 154 raise ChromiumOSError('Invalid attestation status: "%s".' % out) 155 status[field] = match.group(1) == 'true' 156 return status 157 158 159def take_tpm_ownership(): 160 """Take TPM owernship. 161 162 Blocks until TPM is owned. 163 """ 164 __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership') 165 __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership') 166 167 168def verify_ek(): 169 """Verify the TPM endorsement key. 170 171 Returns true if EK is valid. 172 """ 173 cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek' 174 return (utils.system(cmd, ignore_status=True) == 0) 175 176 177def remove_vault(user): 178 """Remove the given user's vault from the shadow directory.""" 179 logging.debug('user is %s', user) 180 user_hash = get_user_hash(user) 181 logging.debug('Removing vault for user %s with hash %s', user, user_hash) 182 cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user 183 __run_cmd(cmd) 184 # Ensure that the vault does not exist. 185 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 186 raise ChromiumOSError('Cryptohome could not remove the user\'s vault.') 187 188 189def remove_all_vaults(): 190 """Remove any existing vaults from the shadow directory. 191 192 This function must be run with root privileges. 193 """ 194 for item in os.listdir(constants.SHADOW_ROOT): 195 abs_item = os.path.join(constants.SHADOW_ROOT, item) 196 if os.path.isdir(os.path.join(abs_item, 'vault')): 197 logging.debug('Removing vault for user with hash %s', item) 198 shutil.rmtree(abs_item) 199 200 201def mount_vault(user, password, create=False): 202 """Mount the given user's vault.""" 203 args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user, 204 '--password=%s' % password, '--async'] 205 if create: 206 args.append('--create') 207 logging.info(__run_cmd(' '.join(args))) 208 # Ensure that the vault exists in the shadow directory. 209 user_hash = get_user_hash(user) 210 if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 211 retry = 0 212 mounted = False 213 while retry < MOUNT_RETRY_COUNT and not mounted: 214 time.sleep(1) 215 logging.info("Retry " + str(retry + 1)) 216 __run_cmd(' '.join(args)) 217 # TODO: Remove this additional call to get_user_hash(user) when 218 # crbug.com/690994 is fixed 219 user_hash = get_user_hash(user) 220 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 221 mounted = True 222 retry += 1 223 if not mounted: 224 raise ChromiumOSError('Cryptohome vault not found after mount.') 225 # Ensure that the vault is mounted. 226 if not is_permanent_vault_mounted(user=user, allow_fail=True): 227 raise ChromiumOSError('Cryptohome created a vault but did not mount.') 228 229 230def mount_guest(): 231 """Mount the given user's vault.""" 232 args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async'] 233 logging.info(__run_cmd(' '.join(args))) 234 # Ensure that the guest tmpfs is mounted. 235 if not is_guest_vault_mounted(allow_fail=True): 236 raise ChromiumOSError('Cryptohome did not mount tmpfs.') 237 238 239def test_auth(user, password): 240 cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user, 241 '--password=%s' % password, '--async'] 242 return 'Authentication succeeded' in utils.system_output(cmd) 243 244 245def unmount_vault(user): 246 """Unmount the given user's vault. 247 248 Once unmounting for a specific user is supported, the user parameter will 249 name the target user. See crosbug.com/20778. 250 """ 251 __run_cmd(CRYPTOHOME_CMD + ' --action=unmount') 252 # Ensure that the vault is not mounted. 253 if is_vault_mounted(user, allow_fail=True): 254 raise ChromiumOSError('Cryptohome did not unmount the user.') 255 256 257def __get_mount_info(mount_point, allow_fail=False): 258 """Get information about the active mount at a given mount point.""" 259 cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts' 260 try: 261 logging.debug("Active cryptohome mounts:\n" + 262 utils.system_output('cat %s' % cryptohomed_path)) 263 mount_line = utils.system_output( 264 'grep %s %s' % (mount_point, cryptohomed_path), 265 ignore_status=allow_fail) 266 except Exception as e: 267 logging.error(e) 268 raise ChromiumOSError('Could not get info about cryptohome vault ' 269 'through %s. See logs for complete mount-point.' 270 % os.path.dirname(str(mount_point))) 271 return mount_line.split() 272 273 274def __get_user_mount_info(user, allow_fail=False): 275 """Get information about the active mounts for a given user. 276 277 Returns the active mounts at the user's user and system mount points. If no 278 user is given, the active mount at the shared mount point is returned 279 (regular users have a bind-mount at this mount point for backwards 280 compatibility; the guest user has a mount at this mount point only). 281 """ 282 return [__get_mount_info(mount_point=user_path(user), 283 allow_fail=allow_fail), 284 __get_mount_info(mount_point=system_path(user), 285 allow_fail=allow_fail)] 286 287def is_vault_mounted(user, regexes=None, allow_fail=False): 288 """Check whether a vault is mounted for the given user. 289 290 user: If no user is given, the shared mount point is checked, determining 291 whether a vault is mounted for any user. 292 regexes: dictionary of regexes to matches against the mount information. 293 The mount filesystem for the user's user and system mounts point must 294 match one of the keys. 295 The mount source point must match the selected device regex. 296 297 In addition, if mounted over ext4, we check the directory is encrypted. 298 """ 299 if regexes is None: 300 regexes = { 301 constants.CRYPTOHOME_FS_REGEX_ANY : 302 constants.CRYPTOHOME_DEV_REGEX_ANY 303 } 304 user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail) 305 for mount_info in user_mount_info: 306 # Look at each /proc/../mount lines that match mount point for a given 307 # user user/system mount (/home/user/.... /home/root/...) 308 309 # We should have at least 3 arguments (source, mount, type of mount) 310 if len(mount_info) < 3: 311 return False 312 313 device_regex = None 314 for fs_regex in regexes.keys(): 315 if re.match(fs_regex, mount_info[2]): 316 device_regex = regexes[fs_regex] 317 break 318 319 if not device_regex: 320 # The thrid argument in not the expectd mount point type. 321 return False 322 323 # Check if the mount source match the device regex: it can be loose, 324 # (anything) or stricter if we expect guest filesystem. 325 if not re.match(device_regex, mount_info[0]): 326 return False 327 328 if re.match(constants.CRYPTOHOME_FS_REGEX_EXT4, mount_info[2]): 329 # We are using ext4 crypto. Check there is an encryption key for 330 # that directory. 331 find_key_cmd_list = ['e4crypt get_policy %s' % (mount_info[1]), 332 'cut -d \' \' -f 2'] 333 key = __run_cmd(' | ' .join(find_key_cmd_list)) 334 cmd_list = ['keyctl show @s', 335 'grep %s' % (key), 336 'wc -l'] 337 out = __run_cmd(' | '.join(cmd_list)) 338 if int(out) != 1: 339 return False 340 return True 341 342 343def is_guest_vault_mounted(allow_fail=False): 344 """Check whether a vault backed by tmpfs is mounted for the guest user.""" 345 return is_vault_mounted( 346 user=GUEST_USER_NAME, 347 regexes={ 348 constants.CRYPTOHOME_FS_REGEX_TMPFS : 349 constants.CRYPTOHOME_DEV_REGEX_GUEST, 350 }, 351 allow_fail=allow_fail) 352 353def is_permanent_vault_mounted(user, allow_fail=False): 354 """Check if user is mounted over ecryptfs or ext4 crypto. """ 355 return is_vault_mounted( 356 user=user, 357 regexes={ 358 constants.CRYPTOHOME_FS_REGEX_ECRYPTFS : 359 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW, 360 constants.CRYPTOHOME_FS_REGEX_EXT4 : 361 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE, 362 }, 363 allow_fail=allow_fail) 364 365def get_mounted_vault_path(user, allow_fail=False): 366 """Get the path where the decrypted data for the user is located.""" 367 return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount') 368 369 370def canonicalize(credential): 371 """Perform basic canonicalization of |email_address|. 372 373 Perform basic canonicalization of |email_address|, taking into account that 374 gmail does not consider '.' or caps inside a username to matter. It also 375 ignores everything after a '+'. For example, 376 c.masone+abc@gmail.com == cMaSone@gmail.com, per 377 http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313 378 """ 379 if not credential: 380 return None 381 382 parts = credential.split('@') 383 if len(parts) != 2: 384 raise error.TestError('Malformed email: ' + credential) 385 386 (name, domain) = parts 387 name = name.partition('+')[0] 388 if (domain == constants.SPECIAL_CASE_DOMAIN): 389 name = name.replace('.', '') 390 return '@'.join([name, domain]).lower() 391 392 393def crash_cryptohomed(): 394 # Try to kill cryptohomed so we get something to work with. 395 pid = __run_cmd('pgrep cryptohomed') 396 try: 397 pid = int(pid) 398 except ValueError, e: # empty or invalid string 399 raise error.TestError('Cryptohomed was not running') 400 utils.system('kill -ABRT %d' % pid) 401 # CONT just in case cryptohomed had a spurious STOP. 402 utils.system('kill -CONT %d' % pid) 403 utils.poll_for_condition( 404 lambda: utils.system('ps -p %d' % pid, 405 ignore_status=True) != 0, 406 timeout=180, 407 exception=error.TestError( 408 'Timeout waiting for cryptohomed to coredump')) 409 410 411class CryptohomeProxy(DBusClient): 412 """A DBus proxy client for testing the Cryptohome DBus server. 413 """ 414 CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome' 415 CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome' 416 CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface' 417 ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus' 418 ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = ( 419 'async_id', 'return_status', 'return_code' 420 ) 421 DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties' 422 423 424 def __init__(self, bus_loop=None): 425 self.main_loop = gobject.MainLoop() 426 if bus_loop is None: 427 bus_loop = DBusGMainLoop(set_as_default=True) 428 self.bus = dbus.SystemBus(mainloop=bus_loop) 429 super(CryptohomeProxy, self).__init__(self.main_loop, self.bus, 430 self.CRYPTOHOME_BUS_NAME, 431 self.CRYPTOHOME_OBJECT_PATH) 432 self.iface = dbus.Interface(self.proxy_object, 433 self.CRYPTOHOME_INTERFACE) 434 self.properties = dbus.Interface(self.proxy_object, 435 self.DBUS_PROPERTIES_INTERFACE) 436 self.handle_signal(self.CRYPTOHOME_INTERFACE, 437 self.ASYNC_CALL_STATUS_SIGNAL, 438 self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS) 439 440 441 # Wrap all proxied calls to catch cryptohomed failures. 442 def __call(self, method, *args): 443 try: 444 return method(*args, timeout=180) 445 except dbus.exceptions.DBusException, e: 446 if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply': 447 logging.error('Cryptohome is not responding. Sending ABRT') 448 crash_cryptohomed() 449 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 450 raise e 451 452 453 def __wait_for_specific_signal(self, signal, data): 454 """Wait for the |signal| with matching |data| 455 Returns the resulting dict on success or {} on error. 456 """ 457 # Do not bubble up the timeout here, just return {}. 458 result = {} 459 try: 460 result = self.wait_for_signal(signal) 461 except utils.TimeoutError: 462 return {} 463 for k in data.keys(): 464 if not result.has_key(k) or result[k] != data[k]: 465 return {} 466 return result 467 468 469 # Perform a data-less async call. 470 # TODO(wad) Add __async_data_call. 471 def __async_call(self, method, *args): 472 # Clear out any superfluous async call signals. 473 self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL) 474 out = self.__call(method, *args) 475 logging.debug('Issued call ' + str(method) + 476 ' with async_id ' + str(out)) 477 result = {} 478 try: 479 # __wait_for_specific_signal has a 10s timeout 480 result = utils.poll_for_condition( 481 lambda: self.__wait_for_specific_signal( 482 self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}), 483 timeout=180, 484 desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL) 485 except utils.TimeoutError, e: 486 logging.error('Cryptohome timed out. Sending ABRT.') 487 crash_cryptohomed() 488 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 489 return result 490 491 492 def mount(self, user, password, create=False, async=True): 493 """Mounts a cryptohome. 494 495 Returns True if the mount succeeds or False otherwise. 496 TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe 497 heuristic, then remove this method. See <crosbug.com/20778>. 498 """ 499 if async: 500 return self.__async_call(self.iface.AsyncMount, user, password, 501 create, False, [])['return_status'] 502 out = self.__call(self.iface.Mount, user, password, create, False, []) 503 # Sync returns (return code, return status) 504 return out[1] if len(out) > 1 else False 505 506 507 def unmount(self, user): 508 """Unmounts a cryptohome. 509 510 Returns True if the unmount suceeds or false otherwise. 511 TODO(ellyjones): Once there's a per-user unmount method, use it. See 512 <crosbug.com/20778>. 513 """ 514 return self.__call(self.iface.Unmount) 515 516 517 def is_mounted(self, user): 518 """Tests whether a user's cryptohome is mounted.""" 519 return (utils.is_mountpoint(user_path(user)) 520 and utils.is_mountpoint(system_path(user))) 521 522 523 def require_mounted(self, user): 524 """Raises a test failure if a user's cryptohome is not mounted.""" 525 utils.require_mountpoint(user_path(user)) 526 utils.require_mountpoint(system_path(user)) 527 528 529 def migrate(self, user, oldkey, newkey, async=True): 530 """Migrates the specified user's cryptohome from one key to another.""" 531 if async: 532 return self.__async_call(self.iface.AsyncMigrateKey, 533 user, oldkey, newkey)['return_status'] 534 return self.__call(self.iface.MigrateKey, user, oldkey, newkey) 535 536 537 def remove(self, user, async=True): 538 if async: 539 return self.__async_call(self.iface.AsyncRemove, 540 user)['return_status'] 541 return self.__call(self.iface.Remove, user) 542 543 544 def ensure_clean_cryptohome_for(self, user, password=None): 545 """Ensure a fresh cryptohome exists for user. 546 547 @param user: user who needs a shiny new cryptohome. 548 @param password: if unset, a random password will be used. 549 """ 550 if not password: 551 password = ''.join(random.sample(string.ascii_lowercase, 6)) 552 self.remove(user) 553 self.mount(user, password, create=True) 554