1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7
8from autotest_lib.client.bin import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib import utils
11from autotest_lib.client.common_lib.cros.tendo import webservd_helper
12
13
14class security_Firewall(test.test):
15    """Tests that rules in iptables/ip6tables match our expectations exactly."""
16    version = 1
17
18
19    @staticmethod
20    def get_firewall_settings(executable):
21        rules = utils.system_output("%s -S" % executable)
22        return set([line.strip() for line in rules.splitlines()])
23
24
25    def load_baseline(self, baseline_filename):
26        """The baseline file lists the rules that we expect.
27
28        @param baseline_filename: string name of file containing relevant rules.
29        """
30        baseline_path = os.path.join(self.bindir, baseline_filename)
31        with open(baseline_path) as f:
32            return set([line.strip() for line in f.readlines()])
33
34
35    def dump_rules(self, rules, executable):
36        """Store actual rules in results/ for future use.
37
38        Leaves a list of iptables/ip6tables rules in the results dir
39        so that we can update the baseline file if necessary.
40
41        @param rules: list of string containing rules we found on the board.
42        @param executable: 'iptables' for IPv4 or 'ip6tables' for IPv6.
43        """
44        outf = open(os.path.join(self.resultsdir, "%s_rules" % executable), 'w')
45        for rule in rules:
46            outf.write(rule + "\n")
47
48        outf.close()
49
50
51    @staticmethod
52    def log_error_rules(rules, message):
53        """Log a set of rules and the problem with those rules.
54
55        @param rules: list of string containing rules we have issues with.
56        @param message: string detailing what our problem with the rules is.
57        """
58        rules_str = ", ".join(["'%s'" % rule for rule in rules])
59        logging.error("%s: %s", message, rules_str)
60
61
62    def run_once(self):
63        """Matches found and expected iptables/ip6tables rules.
64        Fails only when rules are missing.
65        """
66
67        failed = False
68        for executable in ["iptables", "ip6tables"]:
69            baseline = self.load_baseline("baseline.%s" % executable)
70            # TODO(wiley) Remove when we get per-board baselines (crbug.com/406013)
71            webserv_rules = self.load_baseline("baseline.webservd")
72            if webservd_helper.webservd_is_running():
73                baseline.update(webserv_rules)
74            current = self.get_firewall_settings(executable)
75
76            # Save to results dir
77            self.dump_rules(current, executable)
78
79            missing_rules = baseline - current
80            extra_rules = current - baseline
81
82            if len(missing_rules) > 0:
83                failed = True
84                self.log_error_rules(missing_rules,
85                                     "Missing %s rules" % executable)
86
87            if len(extra_rules) > 0:
88                # TODO(zqiu): implement a way to verify per-interface rules
89                # that are created dynamically.
90                self.log_error_rules(extra_rules, "Extra %s rules" % executable)
91
92        if failed:
93            raise error.TestFail("Mismatched firewall rules")
94