1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import errno
6import logging
7import os
8import os.path
9import pyudev
10import stat
11import subprocess
12import tempfile
13import threading
14from autotest_lib.client.bin import test, utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.cros import device_jail_utils
17
18
19class security_DeviceJail_Filesystem(test.test):
20    """
21    Ensures that if the device jail filesystem is present, it restricts
22    the devices that can be seen and instantiates jails.
23    """
24    version = 1
25
26    SHUTDOWN_TIMEOUT_SEC = 5
27
28    def warmup(self):
29        super(security_DeviceJail_Filesystem, self).warmup()
30        if not os.path.exists(device_jail_utils.JAIL_CONTROL_PATH):
31            raise error.TestNAError('Device jail is not present')
32
33        self._mount = tempfile.mkdtemp(prefix='djfs_test_')
34        logging.debug('Attempting to mount device_jail_fs on %s', self._mount)
35        try:
36            self._subprocess = subprocess.Popen(['device_jail_fs', self._mount])
37        except OSError as e:
38            if e.errno == errno.ENOENT:
39                raise error.TestNAError('Device jail FS is not present')
40            else:
41                raise error.TestError(
42                    'Device jail FS failed to start: %s' % e.strerror)
43
44
45    def _is_jail_device(self, filename):
46        context = pyudev.Context()
47        device = pyudev.Device.from_device_file(context, filename)
48        return device.subsystem == 'device_jail'
49
50
51    def _check_device(self, filename):
52        logging.debug('Checking device %s', filename)
53        # Devices outside of the FS are fine.
54        if not filename.startswith(self._mount):
55            return
56
57        # Remove mount from full path.
58        relative_dev_path = filename[len(self._mount) + 1:]
59
60        # Check if this is a passthrough device.
61        if relative_dev_path in ['null', 'full', 'zero', 'urandom']:
62            return
63
64        # Ensure USB devices are jailed.
65        if relative_dev_path.startswith('bus/usb'):
66            if self._is_jail_device(filename):
67                return
68            else:
69                raise error.TestError('Device should be jailed: %s' % filename)
70
71        # All other devices should be hidden.
72        raise error.TestError('Device should be hidden: %s' % filename)
73
74
75    def run_once(self):
76        for dirpath, _, filenames in os.walk(self._mount):
77            for filename in filenames:
78                real_path = os.path.realpath(os.path.join(dirpath, filename))
79                try:
80                    mode = os.stat(real_path).st_mode
81                except OSError as e:
82                    continue
83
84                if stat.S_ISCHR(mode):
85                    self._check_device(real_path)
86
87
88    def _clean_shutdown(self):
89        subprocess.check_call(['fusermount', '-u', self._mount])
90        self._subprocess.wait()
91
92
93    def _hard_shutdown(self):
94        logging.warn('Timeout expired, killing device_jail_fs')
95        self._subprocess.kill()
96        self._subprocess.wait()
97
98
99    def cleanup(self):
100        super(security_DeviceJail_Filesystem, self).cleanup()
101        if hasattr(self, '_subprocess'):
102            logging.info('Waiting %d seconds for device_jail_fs shutdown',
103                         self.SHUTDOWN_TIMEOUT_SEC)
104            timeout = threading.Timer(self.SHUTDOWN_TIMEOUT_SEC,
105                                      self._hard_shutdown)
106            timeout.start()
107            self._clean_shutdown()
108            timeout.cancel()
109
110        try:
111            os.rmdir(self._mount)
112        except OSError as e:
113            raise error.TestError('Failed to remove temp dir: %s' % e.strerror)
114