1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, gobject, logging, os, random, re, shutil, string
6from dbus.mainloop.glib import DBusGMainLoop
7
8import common, constants
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros.cros_disks import DBusClient
12
13CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
14GUEST_USER_NAME = '$guest'
15UNAVAILABLE_ACTION = 'Unknown action or no action given.'
16
17class ChromiumOSError(error.TestError):
18    """Generic error for ChromiumOS-specific exceptions."""
19    pass
20
21def __run_cmd(cmd):
22    return utils.system_output(cmd + ' 2>&1', retain_output=True,
23                               ignore_status=True).strip()
24
25def get_user_hash(user):
26    """Get the user hash for the given user."""
27    return utils.system_output(['cryptohome', '--action=obfuscate_user',
28                                '--user=%s' % user])
29
30
31def user_path(user):
32    """Get the user mount point for the given user."""
33    return utils.system_output(['cryptohome-path', 'user', user])
34
35
36def system_path(user):
37    """Get the system mount point for the given user."""
38    return utils.system_output(['cryptohome-path', 'system', user])
39
40
41def ensure_clean_cryptohome_for(user, password=None):
42    """Ensure a fresh cryptohome exists for user.
43
44    @param user: user who needs a shiny new cryptohome.
45    @param password: if unset, a random password will be used.
46    """
47    if not password:
48        password = ''.join(random.sample(string.ascii_lowercase, 6))
49    remove_vault(user)
50    mount_vault(user, password, create=True)
51
52
53def get_tpm_status():
54    """Get the TPM status.
55
56    Returns:
57        A TPM status dictionary, for example:
58        { 'Enabled': True,
59          'Owned': True,
60          'Being Owned': False,
61          'Ready': True,
62          'Password': ''
63        }
64    """
65    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_status')
66    status = {}
67    for field in ['Enabled', 'Owned', 'Being Owned', 'Ready']:
68        match = re.search('TPM %s: (true|false)' % field, out)
69        if not match:
70            raise ChromiumOSError('Invalid TPM status: "%s".' % out)
71        status[field] = match.group(1) == 'true'
72    match = re.search('TPM Password: (\w*)', out)
73    status['Password'] = ''
74    if match:
75        status['Password'] = match.group(1)
76    return status
77
78
79def get_tpm_more_status():
80    """Get more of the TPM status.
81
82    Returns:
83        A TPM more status dictionary, for example:
84        { 'dictionary_attack_lockout_in_effect': False,
85          'attestation_prepared': False,
86          'boot_lockbox_finalized': False,
87          'enabled': True,
88          'owned': True,
89          'owner_password': ''
90          'dictionary_attack_counter': 0,
91          'dictionary_attack_lockout_seconds_remaining': 0,
92          'dictionary_attack_threshold': 10,
93          'attestation_enrolled': False,
94          'initialized': True,
95          'verified_boot_measured': False,
96          'install_lockbox_finalized': True
97        }
98        An empty dictionary is returned if the command is not supported.
99    """
100    status = {}
101    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_more_status | grep :')
102    if out.startswith(UNAVAILABLE_ACTION):
103        # --action=tpm_more_status only exists >= 41.
104        logging.info('Method not supported!')
105        return status
106    for line in out.splitlines():
107        items = line.strip().split(':')
108        if items[1].strip() == 'false':
109            value = False
110        elif items[1].strip() == 'true':
111            value = True
112        elif items[1].strip().isdigit():
113            value = int(items[1].strip())
114        else:
115            value = items[1].strip(' "')
116        status[items[0]] = value
117    return status
118
119
120def is_tpm_lockout_in_effect():
121    """Returns true if the TPM lockout is in effect; false otherwise."""
122    status = get_tpm_more_status()
123    return status.get('dictionary_attack_lockout_in_effect', None)
124
125
126def get_login_status():
127    """Query the login status
128
129    Returns:
130        A login status dictionary containing:
131        { 'owner_user_exists': True|False,
132          'boot_lockbox_finalized': True|False
133        }
134    """
135    out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status')
136    status = {}
137    for field in ['owner_user_exists', 'boot_lockbox_finalized']:
138        match = re.search('%s: (true|false)' % field, out)
139        if not match:
140            raise ChromiumOSError('Invalid login status: "%s".' % out)
141        status[field] = match.group(1) == 'true'
142    return status
143
144
145def get_tpm_attestation_status():
146    """Get the TPM attestation status.  Works similar to get_tpm_status().
147    """
148    out = __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_attestation_status')
149    status = {}
150    for field in ['Prepared', 'Enrolled']:
151        match = re.search('Attestation %s: (true|false)' % field, out)
152        if not match:
153            raise ChromiumOSError('Invalid attestation status: "%s".' % out)
154        status[field] = match.group(1) == 'true'
155    return status
156
157
158def take_tpm_ownership():
159    """Take TPM owernship.
160
161    Blocks until TPM is owned.
162    """
163    __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
164    __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_wait_ownership')
165
166
167def verify_ek():
168    """Verify the TPM endorsement key.
169
170    Returns true if EK is valid.
171    """
172    cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek'
173    return (utils.system(cmd, ignore_status=True) == 0)
174
175
176def remove_vault(user):
177    """Remove the given user's vault from the shadow directory."""
178    logging.debug('user is %s', user)
179    user_hash = get_user_hash(user)
180    logging.debug('Removing vault for user %s with hash %s' % (user, user_hash))
181    cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
182    __run_cmd(cmd)
183    # Ensure that the vault does not exist.
184    if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
185        raise ChromiumOSError('Cryptohome could not remove the user\'s vault.')
186
187
188def remove_all_vaults():
189    """Remove any existing vaults from the shadow directory.
190
191    This function must be run with root privileges.
192    """
193    for item in os.listdir(constants.SHADOW_ROOT):
194        abs_item = os.path.join(constants.SHADOW_ROOT, item)
195        if os.path.isdir(os.path.join(abs_item, 'vault')):
196            logging.debug('Removing vault for user with hash %s' % item)
197            shutil.rmtree(abs_item)
198
199
200def mount_vault(user, password, create=False):
201    """Mount the given user's vault."""
202    args = [CRYPTOHOME_CMD, '--action=mount', '--user=%s' % user,
203            '--password=%s' % password, '--async']
204    if create:
205        args.append('--create')
206    logging.info(__run_cmd(' '.join(args)))
207    # Ensure that the vault exists in the shadow directory.
208    user_hash = get_user_hash(user)
209    if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
210        raise ChromiumOSError('Cryptohome vault not found after mount.')
211    # Ensure that the vault is mounted.
212    if not is_vault_mounted(
213            user=user,
214            device_regex=constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER,
215            allow_fail=True):
216        raise ChromiumOSError('Cryptohome created a vault but did not mount.')
217
218
219def mount_guest():
220    """Mount the given user's vault."""
221    args = [CRYPTOHOME_CMD, '--action=mount_guest', '--async']
222    logging.info(__run_cmd(' '.join(args)))
223    # Ensure that the guest tmpfs is mounted.
224    if not is_guest_vault_mounted(allow_fail=True):
225        raise ChromiumOSError('Cryptohome did not mount tmpfs.')
226
227
228def test_auth(user, password):
229    cmd = [CRYPTOHOME_CMD, '--action=test_auth', '--user=%s' % user,
230           '--password=%s' % password, '--async']
231    return 'Authentication succeeded' in utils.system_output(cmd)
232
233
234def unmount_vault(user):
235    """Unmount the given user's vault.
236
237    Once unmounting for a specific user is supported, the user parameter will
238    name the target user. See crosbug.com/20778.
239    """
240    __run_cmd(CRYPTOHOME_CMD + ' --action=unmount')
241    # Ensure that the vault is not mounted.
242    if is_vault_mounted(user, allow_fail=True):
243        raise ChromiumOSError('Cryptohome did not unmount the user.')
244
245
246def __get_mount_info(mount_point, allow_fail=False):
247    """Get information about the active mount at a given mount point."""
248    cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts'
249    try:
250        logging.info(utils.system_output('cat %s' % cryptohomed_path))
251        mount_line = utils.system_output(
252            'grep %s %s' % (mount_point, cryptohomed_path),
253            ignore_status=allow_fail)
254    except Exception as e:
255        logging.error(e)
256        raise ChromiumOSError('Could not get info about cryptohome vault '
257                              'through %s. See logs for complete mount-point.'
258                              % os.path.dirname(str(mount_point)))
259    return mount_line.split()
260
261
262def __get_user_mount_info(user, allow_fail=False):
263    """Get information about the active mounts for a given user.
264
265    Returns the active mounts at the user's user and system mount points. If no
266    user is given, the active mount at the shared mount point is returned
267    (regular users have a bind-mount at this mount point for backwards
268    compatibility; the guest user has a mount at this mount point only).
269    """
270    return [__get_mount_info(mount_point=user_path(user),
271                             allow_fail=allow_fail),
272            __get_mount_info(mount_point=system_path(user),
273                             allow_fail=allow_fail)]
274
275def is_vault_mounted(
276        user,
277        device_regex=constants.CRYPTOHOME_DEV_REGEX_ANY,
278        fs_regex=constants.CRYPTOHOME_FS_REGEX_ANY,
279        allow_fail=False):
280    """Check whether a vault is mounted for the given user.
281
282    If no user is given, the shared mount point is checked, determining whether
283    a vault is mounted for any user.
284    """
285    user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail)
286    for mount_info in user_mount_info:
287        if (len(mount_info) < 3 or
288                not re.match(device_regex, mount_info[0]) or
289                not re.match(fs_regex, mount_info[2])):
290            return False
291    return True
292
293
294def is_guest_vault_mounted(allow_fail=False):
295    """Check whether a vault backed by tmpfs is mounted for the guest user."""
296    return is_vault_mounted(
297        user=GUEST_USER_NAME,
298        device_regex=constants.CRYPTOHOME_DEV_REGEX_GUEST,
299        fs_regex=constants.CRYPTOHOME_FS_REGEX_TMPFS,
300        allow_fail=allow_fail)
301
302
303def get_mounted_vault_devices(user, allow_fail=False):
304    """Get the device(s) backing the vault mounted for the given user.
305
306    Returns the devices mounted at the user's user and system mount points. If
307    no user is given, the device mounted at the shared mount point is returned.
308    """
309    return [mount_info[0]
310            for mount_info
311            in __get_user_mount_info(user=user, allow_fail=allow_fail)
312            if len(mount_info)]
313
314
315def canonicalize(credential):
316    """Perform basic canonicalization of |email_address|.
317
318    Perform basic canonicalization of |email_address|, taking into account that
319    gmail does not consider '.' or caps inside a username to matter. It also
320    ignores everything after a '+'. For example,
321    c.masone+abc@gmail.com == cMaSone@gmail.com, per
322    http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
323    """
324    if not credential:
325      return None
326
327    parts = credential.split('@')
328    if len(parts) != 2:
329        raise error.TestError('Malformed email: ' + credential)
330
331    (name, domain) = parts
332    name = name.partition('+')[0]
333    if (domain == constants.SPECIAL_CASE_DOMAIN):
334        name = name.replace('.', '')
335    return '@'.join([name, domain]).lower()
336
337
338def crash_cryptohomed():
339    # Try to kill cryptohomed so we get something to work with.
340    pid = __run_cmd('pgrep cryptohomed')
341    try:
342        pid = int(pid)
343    except ValueError, e:  # empty or invalid string
344        raise error.TestError('Cryptohomed was not running')
345    utils.system('kill -ABRT %d' % pid)
346    # CONT just in case cryptohomed had a spurious STOP.
347    utils.system('kill -CONT %d' % pid)
348    utils.poll_for_condition(
349        lambda: utils.system('ps -p %d' % pid,
350                             ignore_status=True) != 0,
351            timeout=180,
352            exception=error.TestError(
353                'Timeout waiting for cryptohomed to coredump'))
354
355
356class CryptohomeProxy(DBusClient):
357    """A DBus proxy client for testing the Cryptohome DBus server.
358    """
359    CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome'
360    CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome'
361    CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface'
362    ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus'
363    ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = (
364        'async_id', 'return_status', 'return_code'
365    )
366    DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
367
368
369    def __init__(self, bus_loop=None):
370        self.main_loop = gobject.MainLoop()
371        if bus_loop is None:
372            bus_loop = DBusGMainLoop(set_as_default=True)
373        self.bus = dbus.SystemBus(mainloop=bus_loop)
374        super(CryptohomeProxy, self).__init__(self.main_loop, self.bus,
375                                              self.CRYPTOHOME_BUS_NAME,
376                                              self.CRYPTOHOME_OBJECT_PATH)
377        self.iface = dbus.Interface(self.proxy_object,
378                                    self.CRYPTOHOME_INTERFACE)
379        self.properties = dbus.Interface(self.proxy_object,
380                                         self.DBUS_PROPERTIES_INTERFACE)
381        self.handle_signal(self.CRYPTOHOME_INTERFACE,
382                           self.ASYNC_CALL_STATUS_SIGNAL,
383                           self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS)
384
385
386    # Wrap all proxied calls to catch cryptohomed failures.
387    def __call(self, method, *args):
388        try:
389            return method(*args, timeout=180)
390        except dbus.exceptions.DBusException, e:
391            if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply':
392                logging.error('Cryptohome is not responding. Sending ABRT')
393                crash_cryptohomed()
394                raise ChromiumOSError('cryptohomed aborted. Check crashes!')
395            raise e
396
397
398    def __wait_for_specific_signal(self, signal, data):
399      """Wait for the |signal| with matching |data|
400         Returns the resulting dict on success or {} on error.
401      """
402      # Do not bubble up the timeout here, just return {}.
403      result = {}
404      try:
405          result = self.wait_for_signal(signal)
406      except utils.TimeoutError:
407          return {}
408      for k in data.keys():
409          if not result.has_key(k) or result[k] != data[k]:
410            return {}
411      return result
412
413
414    # Perform a data-less async call.
415    # TODO(wad) Add __async_data_call.
416    def __async_call(self, method, *args):
417        # Clear out any superfluous async call signals.
418        self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL)
419        out = self.__call(method, *args)
420        logging.debug('Issued call ' + str(method) +
421                      ' with async_id ' + str(out))
422        result = {}
423        try:
424            # __wait_for_specific_signal has a 10s timeout
425            result = utils.poll_for_condition(
426                lambda: self.__wait_for_specific_signal(
427                    self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}),
428                timeout=180,
429                desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL)
430        except utils.TimeoutError, e:
431            logging.error('Cryptohome timed out. Sending ABRT.')
432            crash_cryptohomed()
433            raise ChromiumOSError('cryptohomed aborted. Check crashes!')
434        return result
435
436
437    def mount(self, user, password, create=False, async=True):
438        """Mounts a cryptohome.
439
440        Returns True if the mount succeeds or False otherwise.
441        TODO(ellyjones): Migrate mount_vault() to use a multi-user-safe
442        heuristic, then remove this method. See <crosbug.com/20778>.
443        """
444        if async:
445            return self.__async_call(self.iface.AsyncMount, user, password,
446                                     create, False, [])['return_status']
447        out = self.__call(self.iface.Mount, user, password, create, False, [])
448        # Sync returns (return code, return status)
449        return out[1] if len(out) > 1 else False
450
451
452    def unmount(self, user):
453        """Unmounts a cryptohome.
454
455        Returns True if the unmount suceeds or false otherwise.
456        TODO(ellyjones): Once there's a per-user unmount method, use it. See
457        <crosbug.com/20778>.
458        """
459        return self.__call(self.iface.Unmount)
460
461
462    def is_mounted(self, user):
463        """Tests whether a user's cryptohome is mounted."""
464        return (utils.is_mountpoint(user_path(user))
465                and utils.is_mountpoint(system_path(user)))
466
467
468    def require_mounted(self, user):
469        """Raises a test failure if a user's cryptohome is not mounted."""
470        utils.require_mountpoint(user_path(user))
471        utils.require_mountpoint(system_path(user))
472
473
474    def migrate(self, user, oldkey, newkey, async=True):
475        """Migrates the specified user's cryptohome from one key to another."""
476        if async:
477            return self.__async_call(self.iface.AsyncMigrateKey,
478                                     user, oldkey, newkey)['return_status']
479        return self.__call(self.iface.MigrateKey, user, oldkey, newkey)
480
481
482    def remove(self, user, async=True):
483        if async:
484            return self.__async_call(self.iface.AsyncRemove,
485                                     user)['return_status']
486        return self.__call(self.iface.Remove, user)
487
488
489    def ensure_clean_cryptohome_for(self, user, password=None):
490        """Ensure a fresh cryptohome exists for user.
491
492        @param user: user who needs a shiny new cryptohome.
493        @param password: if unset, a random password will be used.
494        """
495        if not password:
496            password = ''.join(random.sample(string.ascii_lowercase, 6))
497        self.remove(user)
498        self.mount(user, password, create=True)
499