1#!/usr/bin/env python3 2 3import argparse 4import logging 5import operator 6import os 7import re 8import textwrap 9 10from gensyscalls import SupportedArchitectures, SysCallsTxtParser 11 12 13BPF_JGE = "BPF_JUMP(BPF_JMP|BPF_JGE|BPF_K, {0}, {1}, {2})" 14BPF_JEQ = "BPF_JUMP(BPF_JMP|BPF_JEQ|BPF_K, {0}, {1}, {2})" 15BPF_ALLOW = "BPF_STMT(BPF_RET|BPF_K, SECCOMP_RET_ALLOW)" 16 17 18class SyscallRange: 19 def __init__(self, name, value): 20 self.names = [name] 21 self.begin = value 22 self.end = self.begin + 1 23 24 def __str__(self): 25 return "(%s, %s, %s)" % (self.begin, self.end, self.names) 26 27 def add(self, name, value): 28 if value != self.end: 29 raise ValueError 30 self.end += 1 31 self.names.append(name) 32 33 34def load_syscall_names_from_file(file_path, architecture): 35 parser = SysCallsTxtParser() 36 parser.parse_open_file(open(file_path)) 37 return {x["name"] for x in parser.syscalls if x.get(architecture)} 38 39 40def load_syscall_priorities_from_file(file_path): 41 format_re = re.compile(r'^\s*([A-Za-z_][A-Za-z0-9_]+)\s*$') 42 priorities = [] 43 with open(file_path) as priority_file: 44 for line in priority_file: 45 match = format_re.match(line) 46 if match is None: 47 continue 48 try: 49 name = match.group(1) 50 priorities.append(name) 51 except IndexError: 52 # TODO: This should be impossible becauase it wouldn't have matched? 53 logging.exception('Failed to parse %s from %s', line, file_path) 54 55 return priorities 56 57 58def merge_names(base_names, allowlist_names, blocklist_names): 59 if bool(blocklist_names - base_names): 60 raise RuntimeError("blocklist item not in bionic - aborting " + str( 61 blocklist_names - base_names)) 62 63 return (base_names - blocklist_names) | allowlist_names 64 65 66def extract_priority_syscalls(syscalls, priorities): 67 # Extract syscalls that are not in the priority list 68 other_syscalls = \ 69 [syscall for syscall in syscalls if syscall[0] not in priorities] 70 # For prioritized syscalls, keep the order in which they appear in th 71 # priority list 72 syscall_dict = {syscall[0]: syscall[1] for syscall in syscalls} 73 priority_syscalls = [] 74 for name in priorities: 75 if name in syscall_dict.keys(): 76 priority_syscalls.append((name, syscall_dict[name])) 77 return priority_syscalls, other_syscalls 78 79 80def parse_syscall_NRs(names_path): 81 # The input is now the preprocessed source file. This will contain a lot 82 # of junk from the preprocessor, but our lines will be in the format: 83 # 84 # #define __(ARM_)?NR_${NAME} ${VALUE} 85 # 86 # Where ${VALUE} is a preprocessor expression. 87 88 constant_re = re.compile( 89 r'^\s*#define\s+([A-Za-z_][A-Za-z0-9_]+)\s+(.+)\s*$') 90 token_re = re.compile(r'\b[A-Za-z_][A-Za-z0-9_]+\b') 91 constants = {} 92 with open(names_path) as f: 93 for line in f: 94 m = constant_re.match(line) 95 if m is None: 96 continue 97 try: 98 name = m.group(1) 99 # eval() takes care of any arithmetic that may be done 100 value = eval(token_re.sub(lambda x: str(constants[x.group(0)]), 101 m.group(2))) 102 103 constants[name] = value 104 except: # pylint: disable=bare-except 105 # TODO: This seems wrong. 106 # Key error doesn't seem like the error the original author was trying 107 # to catch. It looks like the intent was to catch IndexError from 108 # match.group() for non-matching lines, but that's impossible because 109 # the match object is checked and continued if not matched. What 110 # actually happens is that KeyError is thrown by constants[x.group(0)] 111 # on at least the first run because the dict is empty. 112 # 113 # It's also matching syntax errors because not all C integer literals 114 # are valid Python integer literals, e.g. 10L. 115 logging.debug('Failed to parse %s', line) 116 117 syscalls = {} 118 for name, value in constants.items(): 119 if not name.startswith("__NR_") and not name.startswith("__ARM_NR"): 120 continue 121 if name.startswith("__NR_"): 122 # Remote the __NR_ prefix 123 name = name[len("__NR_"):] 124 syscalls[name] = value 125 126 return syscalls 127 128 129def convert_NRs_to_ranges(syscalls): 130 # Sort the values so we convert to ranges and binary chop 131 syscalls = sorted(syscalls, key=operator.itemgetter(1)) 132 133 # Turn into a list of ranges. Keep the names for the comments 134 ranges = [] 135 for name, value in syscalls: 136 if not ranges: 137 ranges.append(SyscallRange(name, value)) 138 continue 139 140 last_range = ranges[-1] 141 if last_range.end == value: 142 last_range.add(name, value) 143 else: 144 ranges.append(SyscallRange(name, value)) 145 return ranges 146 147 148# Converts the sorted ranges of allowed syscalls to a binary tree bpf 149# For a single range, output a simple jump to {fail} or {allow}. We can't set 150# the jump ranges yet, since we don't know the size of the filter, so use a 151# placeholder 152# For multiple ranges, split into two, convert the two halves and output a jump 153# to the correct half 154def convert_to_intermediate_bpf(ranges): 155 if len(ranges) == 1: 156 # We will replace {fail} and {allow} with appropriate range jumps later 157 return [BPF_JGE.format(ranges[0].end, "{fail}", "{allow}") + 158 ", //" + "|".join(ranges[0].names)] 159 160 half = (len(ranges) + 1) // 2 161 first = convert_to_intermediate_bpf(ranges[:half]) 162 second = convert_to_intermediate_bpf(ranges[half:]) 163 jump = [BPF_JGE.format(ranges[half].begin, len(first), 0) + ","] 164 return jump + first + second 165 166 167# Converts the prioritized syscalls to a bpf list that is prepended to the 168# tree generated by convert_to_intermediate_bpf(). If we hit one of these 169# syscalls, shortcut to the allow statement at the bottom of the tree 170# immediately 171def convert_priority_to_intermediate_bpf(priority_syscalls): 172 result = [] 173 for syscall in priority_syscalls: 174 result.append(BPF_JEQ.format(syscall[1], "{allow}", 0) + 175 ", //" + syscall[0]) 176 return result 177 178 179def convert_ranges_to_bpf(ranges, priority_syscalls): 180 bpf = convert_priority_to_intermediate_bpf(priority_syscalls) + \ 181 convert_to_intermediate_bpf(ranges) 182 183 # Now we know the size of the tree, we can substitute the {fail} and {allow} 184 # placeholders 185 for i, statement in enumerate(bpf): 186 # Replace placeholder with 187 # "distance to jump to fail, distance to jump to allow" 188 # We will add a kill statement and an allow statement after the tree 189 # With bpfs jmp 0 means the next statement, so the distance to the end is 190 # len(bpf) - i - 1, which is where we will put the kill statement, and 191 # then the statement after that is the allow statement 192 bpf[i] = statement.format(fail=str(len(bpf) - i), 193 allow=str(len(bpf) - i - 1)) 194 195 # Add the allow calls at the end. If the syscall is not matched, we will 196 # continue. This allows the user to choose to match further syscalls, and 197 # also to choose the action when we want to block 198 bpf.append(BPF_ALLOW + ",") 199 200 # Add check that we aren't off the bottom of the syscalls 201 bpf.insert(0, BPF_JGE.format(ranges[0].begin, 0, str(len(bpf))) + ',') 202 return bpf 203 204 205def convert_bpf_to_output(bpf, architecture, name_modifier): 206 if name_modifier: 207 name_modifier = name_modifier + "_" 208 else: 209 name_modifier = "" 210 header = textwrap.dedent("""\ 211 // File autogenerated by {self_path} - edit at your peril!! 212 213 #include <linux/filter.h> 214 #include <errno.h> 215 216 #include "seccomp/seccomp_bpfs.h" 217 const sock_filter {architecture}_{suffix}filter[] = {{ 218 """).format(self_path=os.path.basename(__file__), architecture=architecture, 219 suffix=name_modifier) 220 221 footer = textwrap.dedent("""\ 222 223 }}; 224 225 const size_t {architecture}_{suffix}filter_size = sizeof({architecture}_{suffix}filter) / sizeof(struct sock_filter); 226 """).format(architecture=architecture,suffix=name_modifier) 227 return header + "\n".join(bpf) + footer 228 229 230def construct_bpf(syscalls, architecture, name_modifier, priorities): 231 priority_syscalls, other_syscalls = \ 232 extract_priority_syscalls(syscalls, priorities) 233 ranges = convert_NRs_to_ranges(other_syscalls) 234 bpf = convert_ranges_to_bpf(ranges, priority_syscalls) 235 return convert_bpf_to_output(bpf, architecture, name_modifier) 236 237 238def gen_policy(name_modifier, out_dir, base_syscall_file, syscall_files, 239 syscall_NRs, priority_file): 240 for arch in SupportedArchitectures: 241 base_names = load_syscall_names_from_file(base_syscall_file, arch) 242 allowlist_names = set() 243 blocklist_names = set() 244 for f in syscall_files: 245 if "blocklist" in f.lower(): 246 blocklist_names |= load_syscall_names_from_file(f, arch) 247 else: 248 allowlist_names |= load_syscall_names_from_file(f, arch) 249 priorities = [] 250 if priority_file: 251 priorities = load_syscall_priorities_from_file(priority_file) 252 253 allowed_syscalls = [] 254 for name in merge_names(base_names, allowlist_names, blocklist_names): 255 try: 256 allowed_syscalls.append((name, syscall_NRs[arch][name])) 257 except: 258 logging.exception("Failed to find %s in %s", name, arch) 259 raise 260 output = construct_bpf(allowed_syscalls, arch, name_modifier, priorities) 261 262 # And output policy 263 filename_modifier = "_" + name_modifier if name_modifier else "" 264 output_path = os.path.join(out_dir, 265 "{}{}_policy.cpp".format(arch, filename_modifier)) 266 with open(output_path, "w") as output_file: 267 output_file.write(output) 268 269 270def main(): 271 parser = argparse.ArgumentParser( 272 description="Generates a seccomp-bpf policy") 273 parser.add_argument("--verbose", "-v", help="Enables verbose logging.") 274 parser.add_argument("--name-modifier", 275 help=("Specifies the name modifier for the policy. " 276 "One of {app,system}.")) 277 parser.add_argument("--out-dir", 278 help="The output directory for the policy files") 279 parser.add_argument("base_file", metavar="base-file", type=str, 280 help="The path of the base syscall list (SYSCALLS.TXT).") 281 parser.add_argument("files", metavar="FILE", type=str, nargs="+", 282 help=("The path of the input files. In order to " 283 "simplify the build rules, it can take any of the " 284 "following files: \n" 285 "* /blocklist.*\\.txt$/ syscall blocklist.\n" 286 "* /allowlist.*\\.txt$/ syscall allowlist.\n" 287 "* /priority.txt$/ priorities for bpf rules.\n" 288 "* otherwise, syscall name-number mapping.\n")) 289 args = parser.parse_args() 290 291 if args.verbose: 292 logging.basicConfig(level=logging.DEBUG) 293 else: 294 logging.basicConfig(level=logging.INFO) 295 296 syscall_files = [] 297 priority_file = None 298 syscall_NRs = {} 299 for filename in args.files: 300 if filename.lower().endswith('.txt'): 301 if filename.lower().endswith('priority.txt'): 302 priority_file = filename 303 else: 304 syscall_files.append(filename) 305 else: 306 m = re.search(r"libseccomp_gen_syscall_nrs_([^/]+)", filename) 307 syscall_NRs[m.group(1)] = parse_syscall_NRs(filename) 308 309 gen_policy(name_modifier=args.name_modifier, out_dir=args.out_dir, 310 syscall_NRs=syscall_NRs, base_syscall_file=args.base_file, 311 syscall_files=syscall_files, priority_file=priority_file) 312 313 314if __name__ == "__main__": 315 main() 316