1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import logging
7import os
8import time
9
10from autotest_lib.client.bin import test, utils
11from autotest_lib.client.common_lib import error
12
13
14class platform_Firewall(test.test):
15    """Ensure the firewall service is working correctly."""
16
17    version = 1
18
19    _PORT = 1234
20    _IFACE = "eth0"
21
22    _TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT
23    _UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT
24    _IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE,
25                                                                         _PORT)
26
27    _POLL_INTERVAL = 5
28
29    _IPTABLES_DEL_CMD = "%s -D INPUT -p %s -m %s --dport %d -j ACCEPT"
30
31    @staticmethod
32    def _iptables_rules(executable):
33        rule_output = utils.system_output("%s -S" % executable)
34        logging.debug(rule_output)
35        return [line.strip() for line in rule_output.splitlines()]
36
37
38    @staticmethod
39    def _check(expected_rule, actual_rules, error_msg, executable, check):
40        # If check() returns false, fail the test.
41        if not check(expected_rule, actual_rules):
42            raise error.TestFail(error_msg % executable)
43
44
45    @staticmethod
46    def _check_included(expected_rule, actual_rules, error_msg, executable):
47        # Test whether the rule is included, fail if it's not.
48        platform_Firewall._check(
49                expected_rule, actual_rules, error_msg, executable,
50                lambda e, a: e in a)
51
52
53    @staticmethod
54    def _check_not_included(expected_rule, actual_rules, error_msg, executable):
55        # Test whether the rule is not included, fail if it is.
56        platform_Firewall._check(
57                expected_rule, actual_rules, error_msg, executable,
58                lambda e, a: e not in a)
59
60
61    def run_once(self):
62        # Create lifeline file descriptors.
63        self.tcp_r, self.tcp_w = os.pipe()
64        self.udp_r, self.udp_w = os.pipe()
65        self.iface_r, self.iface_w = os.pipe()
66
67        try:
68            bus = dbus.SystemBus()
69            pb_proxy = bus.get_object('org.chromium.PermissionBroker',
70                                      '/org/chromium/PermissionBroker')
71            pb = dbus.Interface(pb_proxy, 'org.chromium.PermissionBroker')
72
73            tcp_lifeline = dbus.types.UnixFd(self.tcp_r)
74            ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "",
75                                          tcp_lifeline)
76            # |ret| is a dbus.Boolean, but compares as int.
77            if ret == 0:
78                raise error.TestFail("RequestTcpPortAccess returned false.")
79
80            udp_lifeline = dbus.types.UnixFd(self.udp_r)
81            ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "",
82                                          udp_lifeline)
83            # |ret| is a dbus.Boolean, but compares as int.
84            if ret == 0:
85                raise error.TestFail("RequestUdpPortAccess returned false.")
86
87            iface_lifeline = dbus.types.UnixFd(self.iface_r)
88            ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT),
89                                          dbus.String(self._IFACE),
90                                          iface_lifeline)
91            # |ret| is a dbus.Boolean, but compares as int.
92            if ret == 0:
93                raise error.TestFail(
94                        "RequestTcpPortAccess(port, interface) returned false.")
95
96            # Test IPv4 and IPv6.
97            for executable in ["iptables", "ip6tables"]:
98                actual_rules = self._iptables_rules(executable)
99                self._check_included(
100                        self._TCP_RULE, actual_rules,
101                        "RequestTcpPortAccess did not add %s rule.",
102                        executable)
103                self._check_included(
104                        self._UDP_RULE, actual_rules,
105                        "RequestUdpPortAccess did not add %s rule.",
106                        executable)
107                self._check_included(
108                        self._IFACE_RULE, actual_rules,
109                        "RequestTcpPortAccess(port, interface)"
110                        " did not add %s rule.",
111                        executable)
112
113            ret = pb.ReleaseTcpPort(dbus.UInt16(self._PORT), "")
114            # |ret| is a dbus.Boolean, but compares as int.
115            if ret == 0:
116                raise error.TestFail("ReleaseTcpPort returned false.")
117
118            ret = pb.ReleaseUdpPort(dbus.UInt16(self._PORT), "")
119            # |ret| is a dbus.Boolean, but compares as int.
120            if ret == 0:
121                raise error.TestFail("ReleaseUdpPort returned false.")
122
123            # Test IPv4 and IPv6.
124            for executable in ["iptables", "ip6tables"]:
125                rules = self._iptables_rules(executable)
126                self._check_not_included(
127                        self._TCP_RULE, rules,
128                        "ReleaseTcpPortAccess did not remove %s rule.",
129                        executable)
130                self._check_not_included(
131                        self._UDP_RULE, rules,
132                        "ReleaseUdpPortAccess did not remove %s rule.",
133                        executable)
134
135            # permission_broker should plug the firewall hole
136            # when the requesting process exits.
137            # Simulate the process exiting by closing |iface_w|.
138            os.close(self.iface_w)
139
140            # permission_broker checks every |_POLL_INTERVAL| seconds
141            # for processes that have exited.
142            # This is ugly, but it's either this or polling /var/log/messages.
143            time.sleep(self._POLL_INTERVAL + 1)
144            # Test IPv4 and IPv6.
145            for executable in ["iptables", "ip6tables"]:
146                rules = self._iptables_rules(executable)
147                self._check_not_included(
148                        self._IFACE_RULE, rules,
149                        "permission_broker did not remove %s rule.",
150                        executable)
151
152        except dbus.DBusException as e:
153            raise error.TestFail("D-Bus error: " + e.get_dbus_message())
154
155
156    def cleanup(self):
157        # File descriptors could already be closed.
158        try:
159            os.close(self.tcp_w)
160            os.close(self.udp_w)
161            os.close(self.iface_w)
162        except OSError:
163            pass
164
165        # We don't want the cleanup() method to fail, so we ignore exit codes.
166        # This also allows us to clean up iptables rules unconditionally.
167        # The command will fail if the rule has already been deleted,
168        # but it won't fail the test.
169        for executable in ["iptables", "ip6tables"]:
170            cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
171                                            self._PORT)
172            utils.system(cmd, ignore_status=True)
173            cmd = self._IPTABLES_DEL_CMD % (executable, "udp", "udp",
174                                            self._PORT)
175            utils.system(cmd, ignore_status=True)
176            cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp",
177                                            self._PORT)
178            cmd += " -i %s" % self._IFACE
179            utils.system(cmd, ignore_status=True)
180