1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, gobject, logging, os, random, re, shutil, string, time
6from dbus.mainloop.glib import DBusGMainLoop
7
8import common, constants
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros.cros_disks import DBusClient
12
13CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
14GUEST_USER_NAME = '$guest'
15UNAVAILABLE_ACTION = 'Unknown action or no action given.'
16MOUNT_RETRY_COUNT = 20
17
18class ChromiumOSError(error.TestError):
19    """Generic error for ChromiumOS-specific exceptions."""
20    pass
21
22def __run_cmd(cmd):
23    return utils.system_output(cmd + ' 2>&1', retain_output=True,
24                               ignore_status=True).strip()
25
26def get_user_hash(user):
27    """Get the user hash for the given user."""
28    return utils.system_output(['cryptohome', '--action=obfuscate_user',
29                                '--user=%s' % user])
30
31
32def user_path(user):
33    """Get the user mount point for the given user."""
34    return utils.system_output(['cryptohome-path', 'user', user])
35
36
37def system_path(user):
38    """Get the system mount point for the given user."""
39    return utils.system_output(['cryptohome-path', 'system', user])
40
41
42def ensure_clean_cryptohome_for(user, password=None):
43    """Ensure a fresh cryptohome exists for user.
44
45    @param user: user who needs a shiny new cryptohome.
46    @param password: if unset, a random password will be used.
47    """
48    if not password:
49        password = ''.join(random.sample(string.ascii_lowercase, 6))
50    remove_vault(user)
51    mount_vault(user, password, create=True)
52
53
54def get_tpm_status():
55    """Get the TPM status.
56
57    Returns:
58        A TPM status dictionary, for example:
59        { 'Enabled': True,
60          'Owned': True,
61          'Being Owned': False,
62          'Ready': True,
63          'Password': ''
64        }
65    """
66    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
67    status = {}
68    for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
69        match = re.search('TPM %s: (true|false)' % field, out)
70        if not match:
71            raise ChromiumOSError('Invalid TPM status: "%s".' % out)
72        status[field] = match.group(1) == 'true'
73    match = re.search('TPM Password: (\w*)', out)
74    status['Password'] = ''
75    if match:
76        status['Password'] = match.group(1)
77    return status
78
79
80def get_tpm_more_status():
81    """Get more of the TPM status.
82
83    Returns:
84        A TPM more status dictionary, for example:
85        { 'dictionary_attack_lockout_in_effect': False,
86          'attestation_prepared': False,
87          'boot_lockbox_finalized': False,
88          'enabled': True,
89          'owned': True,
90          'owner_password': ''
91          'dictionary_attack_counter': 0,
92          'dictionary_attack_lockout_seconds_remaining': 0,
93          'dictionary_attack_threshold': 10,
94          'attestation_enrolled': False,
95          'initialized': True,
96          'verified_boot_measured': False,
97          'install_lockbox_finalized': True
98        }
99        An empty dictionary is returned if the command is not supported.
100    """
101    status = {}
102    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :')
103    if out.startswith(UNAVAILABLE_ACTION):
104        # --action=tpm_more_status only exists >= 41.
105        logging.info('Method not supported!')
106        return status
107    for line in out.splitlines():
108        items = line.strip().split(':')
109        if items[1].strip() == 'false':
110            value = False
111        elif items[1].strip() == 'true':
112            value = True
113        elif items[1].strip().isdigit():
114            value = int(items[1].strip())
115        else:
116            value = items[1].strip(' "')
117        status[items[0]] = value
118    return status
119
120
121def is_tpm_lockout_in_effect():
122    """Returns true if the TPM lockout is in effect; false otherwise."""
123    status = get_tpm_more_status()
124    return status.get('dictionary_attack_lockout_in_effect', None)
125
126
127def get_login_status():
128    """Query the login status
129
130    Returns:
131        A login status dictionary containing:
132        { 'owner_user_exists': True|False,
133          'boot_lockbox_finalized': True|False
134        }
135    """
136    out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status')
137    status = {}
138    for field in ['owner_user_exists', 'boot_lockbox_finalized']:
139        match = re.search('%s: (true|false)' % field, out)
140        if not match:
141            raise ChromiumOSError('Invalid login status: "%s".' % out)
142        status[field] = match.group(1) == 'true'
143    return status
144
145
146def get_tpm_attestation_status():
147    """Get the TPM attestation status.  Works similar to get_tpm_status().
148    """
149    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status')
150    status = {}
151    for field in ['Prepared', 'Enrolled']:
152        match = re.search('Attestation %s: (true|false)' % field, out)
153        if not match:
154            raise ChromiumOSError('Invalid attestation status: "%s".' % out)
155        status[field] = match.group(1) == 'true'
156    return status
157
158
159def take_tpm_ownership():
160    """Take TPM owernship.
161
162    Blocks until TPM is owned.
163    """
164    __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
165    __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership')
166
167
168def verify_ek():
169    """Verify the TPM endorsement key.
170
171    Returns true if EK is valid.
172    """
173    cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek'
174    return (utils.system(cmd, ignore_status=True) == 0)
175
176
177def remove_vault(user):
178    """Remove the given user's vault from the shadow directory."""
179    logging.debug('user is %s', user)
180    user_hash = get_user_hash(user)
181    logging.debug('Removing vault for user %s with hash %s', user, user_hash)
182    cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
183    __run_cmd(cmd)
184    # Ensure that the vault does not exist.
185    if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
186        raise ChromiumOSError('Cryptohome could not remove the user\'s vault.')
187
188
189def remove_all_vaults():
190    """Remove any existing vaults from the shadow directory.
191
192    This function must be run with root privileges.
193    """
194    for item in os.listdir(constants.SHADOW_ROOT):
195        abs_item = os.path.join(constants.SHADOW_ROOT, item)
196        if os.path.isdir(os.path.join(abs_item, 'vault')):
197            logging.debug('Removing vault for user with hash %s', item)
198            shutil.rmtree(abs_item)
199
200
201def mount_vault(user, password, create=False):
202    """Mount the given user's vault."""
203    args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user,
204            '--password=%s' % password, '--async']
205    if create:
206        args.append('--create')
207    logging.info(__run_cmd(' '.join(args)))
208    # Ensure that the vault exists in the shadow directory.
209    user_hash = get_user_hash(user)
210    if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
211        retry = 0
212        mounted = False
213        while retry < MOUNT_RETRY_COUNT and not mounted:
214            time.sleep(1)
215            logging.info("Retry " + str(retry + 1))
216            __run_cmd(' '.join(args))
217            # TODO: Remove this additional call to get_user_hash(user) when
218            # crbug.com/690994 is fixed
219            user_hash = get_user_hash(user)
220            if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
221                mounted = True
222            retry += 1
223        if not mounted:
224            raise ChromiumOSError('Cryptohome vault not found after mount.')
225    # Ensure that the vault is mounted.
226    if not is_permanent_vault_mounted(user=user, allow_fail=True):
227        raise ChromiumOSError('Cryptohome created a vault but did not mount.')
228
229
230def mount_guest():
231    """Mount the given user's vault."""
232    args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async']
233    logging.info(__run_cmd(' '.join(args)))
234    # Ensure that the guest tmpfs is mounted.
235    if not is_guest_vault_mounted(allow_fail=True):
236        raise ChromiumOSError('Cryptohome did not mount tmpfs.')
237
238
239def test_auth(user, password):
240    cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user,
241           '--password=%s' % password, '--async']
242    return 'Authentication succeeded' in utils.system_output(cmd)
243
244
245def unmount_vault(user):
246    """Unmount the given user's vault.
247
248    Once unmounting for a specific user is supported, the user parameter will
249    name the target user. See crosbug.com/20778.
250    """
251    __run_cmd(CRYPTOHOME_CMD + ' --action=unmount')
252    # Ensure that the vault is not mounted.
253    if is_vault_mounted(user, allow_fail=True):
254        raise ChromiumOSError('Cryptohome did not unmount the user.')
255
256
257def __get_mount_info(mount_point, allow_fail=False):
258    """Get information about the active mount at a given mount point."""
259    cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts'
260    try:
261        logging.debug("Active cryptohome mounts:\n" +
262                      utils.system_output('cat %s' % cryptohomed_path))
263        mount_line = utils.system_output(
264            'grep %s %s' % (mount_point, cryptohomed_path),
265            ignore_status=allow_fail)
266    except Exception as e:
267        logging.error(e)
268        raise ChromiumOSError('Could not get info about cryptohome vault '
269                              'through %s. See logs for complete mount-point.'
270                              % os.path.dirname(str(mount_point)))
271    return mount_line.split()
272
273
274def __get_user_mount_info(user, allow_fail=False):
275    """Get information about the active mounts for a given user.
276
277    Returns the active mounts at the user's user and system mount points. If no
278    user is given, the active mount at the shared mount point is returned
279    (regular users have a bind-mount at this mount point for backwards
280    compatibility; the guest user has a mount at this mount point only).
281    """
282    return [__get_mount_info(mount_point=user_path(user),
283                             allow_fail=allow_fail),
284            __get_mount_info(mount_point=system_path(user),
285                             allow_fail=allow_fail)]
286
287def is_vault_mounted(user, regexes=None, allow_fail=False):
288    """Check whether a vault is mounted for the given user.
289
290    user: If no user is given, the shared mount point is checked, determining
291      whether a vault is mounted for any user.
292    regexes: dictionary of regexes to matches against the mount information.
293      The mount filesystem for the user's user and system mounts point must
294      match one of the keys.
295      The mount source point must match the selected device regex.
296
297    In addition, if mounted over ext4, we check the directory is encrypted.
298    """
299    if regexes is None:
300        regexes = {
301            constants.CRYPTOHOME_FS_REGEX_ANY :
302               constants.CRYPTOHOME_DEV_REGEX_ANY
303        }
304    user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail)
305    for mount_info in user_mount_info:
306        # Look at each /proc/../mount lines that match mount point for a given
307        # user user/system mount (/home/user/.... /home/root/...)
308
309        # We should have at least 3 arguments (source, mount, type of mount)
310        if len(mount_info) < 3:
311            return False
312
313        device_regex = None
314        for fs_regex in regexes.keys():
315            if re.match(fs_regex, mount_info[2]):
316                device_regex = regexes[fs_regex]
317                break
318
319        if not device_regex:
320            # The thrid argument in not the expectd mount point type.
321            return False
322
323        # Check if the mount source match the device regex: it can be loose,
324        # (anything) or stricter if we expect guest filesystem.
325        if not re.match(device_regex, mount_info[0]):
326            return False
327
328        if re.match(constants.CRYPTOHOME_FS_REGEX_EXT4, mount_info[2]):
329            # We are using ext4 crypto. Check there is an encryption key for
330            # that directory.
331            find_key_cmd_list = ['e4crypt  get_policy %s' % (mount_info[1]),
332                                 'cut -d \' \' -f 2']
333            key = __run_cmd(' | ' .join(find_key_cmd_list))
334            cmd_list = ['keyctl show @s',
335                        'grep %s' % (key),
336                        'wc -l']
337            out = __run_cmd(' | '.join(cmd_list))
338            if int(out) != 1:
339                return False
340    return True
341
342
343def is_guest_vault_mounted(allow_fail=False):
344    """Check whether a vault backed by tmpfs is mounted for the guest user."""
345    return is_vault_mounted(
346        user=GUEST_USER_NAME,
347        regexes={
348            constants.CRYPTOHOME_FS_REGEX_TMPFS :
349                constants.CRYPTOHOME_DEV_REGEX_GUEST,
350        },
351        allow_fail=allow_fail)
352
353def is_permanent_vault_mounted(user, allow_fail=False):
354    """Check if user is mounted over ecryptfs or ext4 crypto. """
355    return is_vault_mounted(
356        user=user,
357        regexes={
358            constants.CRYPTOHOME_FS_REGEX_ECRYPTFS :
359                constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW,
360            constants.CRYPTOHOME_FS_REGEX_EXT4 :
361                constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE,
362        },
363        allow_fail=allow_fail)
364
365def get_mounted_vault_path(user, allow_fail=False):
366    """Get the path where the decrypted data for the user is located."""
367    return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount')
368
369
370def canonicalize(credential):
371    """Perform basic canonicalization of |email_address|.
372
373    Perform basic canonicalization of |email_address|, taking into account that
374    gmail does not consider '.' or caps inside a username to matter. It also
375    ignores everything after a '+'. For example,
376    c.masone+abc@gmail.com == cMaSone@gmail.com, per
377    http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
378    """
379    if not credential:
380      return None
381
382    parts = credential.split('@')
383    if len(parts) != 2:
384        raise error.TestError('Malformed email: ' + credential)
385
386    (name, domain) = parts
387    name = name.partition('+')[0]
388    if (domain == constants.SPECIAL_CASE_DOMAIN):
389        name = name.replace('.', '')
390    return '@'.join([name, domain]).lower()
391
392
393def crash_cryptohomed():
394    # Try to kill cryptohomed so we get something to work with.
395    pid = __run_cmd('pgrep cryptohomed')
396    try:
397        pid = int(pid)
398    except ValueError, e:  # empty or invalid string
399        raise error.TestError('Cryptohomed was not running')
400    utils.system('kill -ABRT %d' % pid)
401    # CONT just in case cryptohomed had a spurious STOP.
402    utils.system('kill -CONT %d' % pid)
403    utils.poll_for_condition(
404        lambda: utils.system('ps -p %d' % pid,
405                             ignore_status=True) != 0,
406            timeout=180,
407            exception=error.TestError(
408                'Timeout waiting for cryptohomed to coredump'))
409
410
411class CryptohomeProxy(DBusClient):
412    """A DBus proxy client for testing the Cryptohome DBus server.
413    """
414    CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome'
415    CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome'
416    CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface'
417    ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus'
418    ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = (
419        'async_id', 'return_status', 'return_code'
420    )
421    DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
422
423
424    def __init__(self, bus_loop=None):
425        self.main_loop = gobject.MainLoop()
426        if bus_loop is None:
427            bus_loop = DBusGMainLoop(set_as_default=True)
428        self.bus = dbus.SystemBus(mainloop=bus_loop)
429        super(CryptohomeProxy, self).__init__(self.main_loop, self.bus,
430                                              self.CRYPTOHOME_BUS_NAME,
431                                              self.CRYPTOHOME_OBJECT_PATH)
432        self.iface = dbus.Interface(self.proxy_object,
433                                    self.CRYPTOHOME_INTERFACE)
434        self.properties = dbus.Interface(self.proxy_object,
435                                         self.DBUS_PROPERTIES_INTERFACE)
436        self.handle_signal(self.CRYPTOHOME_INTERFACE,
437                           self.ASYNC_CALL_STATUS_SIGNAL,
438                           self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS)
439
440
441    # Wrap all proxied calls to catch cryptohomed failures.
442    def __call(self, method, *args):
443        try:
444            return method(*args, timeout=180)
445        except dbus.exceptions.DBusException, e:
446            if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
447                logging.error('Cryptohome is not responding. Sending ABRT')
448                crash_cryptohomed()
449                raise ChromiumOSError('cryptohomed aborted. Check crashes!')
450            raise e
451
452
453    def __wait_for_specific_signal(self, signal, data):
454      """Wait for the |signal| with matching |data|
455         Returns the resulting dict on success or {} on error.
456      """
457      # Do not bubble up the timeout here, just return {}.
458      result = {}
459      try:
460          result = self.wait_for_signal(signal)
461      except utils.TimeoutError:
462          return {}
463      for k in data.keys():
464          if not result.has_key(k) or result[k] != data[k]:
465            return {}
466      return result
467
468
469    # Perform a data-less async call.
470    # TODO(wad) Add __async_data_call.
471    def __async_call(self, method, *args):
472        # Clear out any superfluous async call signals.
473        self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL)
474        out = self.__call(method, *args)
475        logging.debug('Issued call ' + str(method) +
476                      ' with async_id ' + str(out))
477        result = {}
478        try:
479            # __wait_for_specific_signal has a 10s timeout
480            result = utils.poll_for_condition(
481                lambda: self.__wait_for_specific_signal(
482                    self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}),
483                timeout=180,
484                desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL)
485        except utils.TimeoutError, e:
486            logging.error('Cryptohome timed out. Sending ABRT.')
487            crash_cryptohomed()
488            raise ChromiumOSError('cryptohomed aborted. Check crashes!')
489        return result
490
491
492    def mount(self, user, password, create=False, async=True):
493        """Mounts a cryptohome.
494
495        Returns True if the mount succeeds or False otherwise.
496        TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe
497        heuristic, then remove this method. See <crosbug.com/20778>.
498        """
499        if async:
500            return self.__async_call(self.iface.AsyncMount, user, password,
501                                     create, False, [])['return_status']
502        out = self.__call(self.iface.Mount, user, password, create, False, [])
503        # Sync returns (return code, return status)
504        return out[1] if len(out) > 1 else False
505
506
507    def unmount(self, user):
508        """Unmounts a cryptohome.
509
510        Returns True if the unmount suceeds or false otherwise.
511        TODO(ellyjones): Once there's a per-user unmount method, use it. See
512        <crosbug.com/20778>.
513        """
514        return self.__call(self.iface.Unmount)
515
516
517    def is_mounted(self, user):
518        """Tests whether a user's cryptohome is mounted."""
519        return (utils.is_mountpoint(user_path(user))
520                and utils.is_mountpoint(system_path(user)))
521
522
523    def require_mounted(self, user):
524        """Raises a test failure if a user's cryptohome is not mounted."""
525        utils.require_mountpoint(user_path(user))
526        utils.require_mountpoint(system_path(user))
527
528
529    def migrate(self, user, oldkey, newkey, async=True):
530        """Migrates the specified user's cryptohome from one key to another."""
531        if async:
532            return self.__async_call(self.iface.AsyncMigrateKey,
533                                     user, oldkey, newkey)['return_status']
534        return self.__call(self.iface.MigrateKey, user, oldkey, newkey)
535
536
537    def remove(self, user, async=True):
538        if async:
539            return self.__async_call(self.iface.AsyncRemove,
540                                     user)['return_status']
541        return self.__call(self.iface.Remove, user)
542
543
544    def ensure_clean_cryptohome_for(self, user, password=None):
545        """Ensure a fresh cryptohome exists for user.
546
547        @param user: user who needs a shiny new cryptohome.
548        @param password: if unset, a random password will be used.
549        """
550        if not password:
551            password = ''.join(random.sample(string.ascii_lowercase, 6))
552        self.remove(user)
553        self.mount(user, password, create=True)
554