1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os
6from autotest_lib.client.bin import test, utils
7from autotest_lib.client.common_lib import error
8import tempfile
9
10class security_ModuleLocking(test.test):
11    """
12    Handle examining the system for specific module loading capabilities.
13    """
14    version = 1
15
16    def _passed(self, msg):
17        logging.info('ok: %s', msg)
18
19    def _failed(self, msg):
20        logging.error('FAIL: %s', msg)
21        self._failures.append(msg)
22
23    def _fatal(self, msg):
24        logging.error('FATAL: %s', msg)
25        raise error.TestError(msg)
26
27    def check(self, boolean, msg, fatal=False):
28        """
29        Check boolean state and report condition to log.
30
31        @param boolean: condition to examine
32        @param msg: what the condition is testing
33        @param fatal: should the test full abort on the condition failing
34        """
35        if boolean == True:
36            self._passed(msg)
37        else:
38            msg = "could not satisfy '%s'" % (msg)
39            if fatal:
40                self._fatal(msg)
41            else:
42                self._failed(msg)
43
44    def module_loaded(self, module):
45        """
46        Detect if the given module is already loaded in the kernel.
47
48        @param module: name of module to check
49        """
50        module = module.replace('-', '_')
51        match = "%s " % (module)
52        for line in open("/proc/modules"):
53            if line.startswith(match):
54                return True
55        return False
56
57    def rmmod(self, module):
58        """
59        Unload a module if it is already loaded in the kernel.
60
61        @param module: name of module to unload
62        """
63        if self.module_loaded(module):
64            utils.system("rmmod %s" % (module))
65
66    def modprobe(self, module):
67        """
68        If a module is not already loaded in the kernel, load it via modprobe.
69
70        @param module: name of module to load
71        """
72        if not self.module_loaded(module):
73            utils.system("modprobe %s" % (module))
74
75    def _module_path(self, module):
76        """
77        Locate a kernel module's full filesystem path.
78
79        @param module: name of module to locate
80        """
81        ko = utils.system_output("find /lib/modules -name '%s.ko'" % (module))
82        return ko.splitlines()[0]
83
84    def module_loads_outside_rootfs(self, module):
85        """
86        Copies the given module into /tmp and tries to load it from there
87        using insmod directly.
88
89        @param module: name of module to test
90        """
91        # Start from a clean slate.
92        self.rmmod(module)
93
94        # Make sure we can load with standard mechanisms.
95        self.modprobe(module)
96        self.rmmod(module)
97
98        # Load module directly with insmod from root filesystem.
99        ko = self._module_path(module)
100        utils.system("insmod %s" % (ko))
101        self.rmmod(module)
102
103        # Load module directly with insmod from /tmp.
104        tmp = "/tmp/%s.ko" % (module)
105        utils.system("cp %s %s" % (ko, tmp))
106        rc = utils.system("insmod %s" % (tmp), ignore_status=True)
107
108        # Clean up.
109        self.rmmod(module)
110        utils.system("rm %s" % (tmp))
111
112        if rc == 0:
113            return True
114        return False
115
116    def module_loads_old_api(self, module):
117        """
118        Loads a module using the old blob-style kernel syscall. With
119        kmod, this requires compressing the module first to trigger
120        in-memory decompression and loading.
121
122        @param module: name of module to test
123        """
124        # Start from a clean slate.
125        self.rmmod(module)
126
127        # Compress module to trigger the old API.
128        tmp = "/tmp/%s.ko.gz" % (module)
129        ko = self._module_path(module)
130        utils.system("gzip -c %s > %s" % (ko, tmp))
131        rc = utils.system("insmod %s" % (tmp), ignore_status=True)
132
133        # Clean up.
134        self.rmmod(module)
135        utils.system("rm %s" % (tmp))
136
137        if rc == 0:
138            return True
139        return False
140
141    def module_loads_after_bind_umount(self, module):
142        """
143        Makes sure modules can still load after a bind mount of the
144        filesystem is umounted.
145
146        @param module: name of module to test
147        """
148
149        # Start from a clean slate.
150        self.rmmod(module)
151
152        # Make sure we can load with standard mechanisms.
153        self.modprobe(module)
154        self.rmmod(module)
155
156        # Create and umount a bind mount of the root filesystem.
157        bind = tempfile.mkdtemp(prefix=module)
158        rc = utils.system("mount -o bind / %s && umount %s" % (bind, bind))
159        utils.system("rmdir %s" % (bind))
160
161        # Attempt to load again.
162        self.modprobe(module)
163        self.rmmod(module)
164
165        if rc == 0:
166            return True
167        return False
168
169    def run_once(self):
170        """
171        Check that the fd-based module loading syscall is enforcing the
172        module fd origin to the root filesystem, and that it can be
173        disabled and will allow the old syscall API as well.
174        TODO(keescook): add production test to make sure that on a verified
175        boot, "/proc/sys/kernel/chromiumos/module_locking" does not exist.
176        """
177        # Empty failure list means test passes.
178        self._failures = []
179
180        # Check that the sysctl is either missing or set to 1.
181        sysctl = "/proc/sys/kernel/chromiumos/module_locking"
182        if os.path.exists(sysctl):
183            self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
184
185        # Check the enforced state is to deny non-rootfs module loads.
186        module = "test_module"
187        loaded = self.module_loads_outside_rootfs(module)
188        self.check(loaded == False, "cannot load %s from /tmp" % (module))
189
190        # Check old API fails when enforcement enabled.
191        loaded = self.module_loads_old_api(module)
192        self.check(loaded == False, "cannot load %s with old API" % (module))
193
194        # Make sure the bind umount bug is not present.
195        loaded = self.module_loads_after_bind_umount(module)
196        self.check(loaded == True, "can load %s after bind umount" % (module))
197
198        # If the sysctl exists, verify that it will disable the restriction.
199        if os.path.exists(sysctl):
200            # Disable restriction.
201            open(sysctl, "w").write("0\n")
202            self.check(open(sysctl).read() == '0\n', "%s disabled" % (sysctl))
203
204            # Check enforcement is disabled.
205            loaded = self.module_loads_outside_rootfs(module)
206            self.check(loaded == True, "can load %s from /tmp" % (module))
207
208            # Check old API works when enforcement disabled.
209            loaded = self.module_loads_old_api(module)
210            self.check(loaded == True, "can load %s with old API" % (module))
211
212            # Clean up.
213            open(sysctl, "w").write("1\n")
214            self.check(open(sysctl).read() == '1\n', "%s enabled" % (sysctl))
215
216        # Raise a failure if anything unexpected was seen.
217        if len(self._failures):
218            raise error.TestFail((", ".join(self._failures)))
219