1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus 6import logging 7import os 8import time 9 10from autotest_lib.client.bin import test, utils 11from autotest_lib.client.common_lib import error 12 13 14class platform_Firewall(test.test): 15 """Ensure the firewall service is working correctly.""" 16 17 version = 1 18 19 _PORT = 1234 20 _IFACE = "eth0" 21 22 _TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT 23 _UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT 24 _IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE, 25 _PORT) 26 27 _POLL_INTERVAL = 5 28 29 _IPTABLES_DEL_CMD = "%s -D INPUT -p %s -m %s --dport %d -j ACCEPT" 30 31 @staticmethod 32 def _iptables_rules(executable): 33 rule_output = utils.system_output("%s -S" % executable) 34 logging.debug(rule_output) 35 return [line.strip() for line in rule_output.splitlines()] 36 37 38 @staticmethod 39 def _check(expected_rule, actual_rules, error_msg, executable, check): 40 # If check() returns false, fail the test. 41 if not check(expected_rule, actual_rules): 42 raise error.TestFail(error_msg % executable) 43 44 45 @staticmethod 46 def _check_included(expected_rule, actual_rules, error_msg, executable): 47 # Test whether the rule is included, fail if it's not. 48 platform_Firewall._check( 49 expected_rule, actual_rules, error_msg, executable, 50 lambda e, a: e in a) 51 52 53 @staticmethod 54 def _check_not_included(expected_rule, actual_rules, error_msg, executable): 55 # Test whether the rule is not included, fail if it is. 56 platform_Firewall._check( 57 expected_rule, actual_rules, error_msg, executable, 58 lambda e, a: e not in a) 59 60 61 def run_once(self): 62 # Create lifeline file descriptors. 63 self.tcp_r, self.tcp_w = os.pipe() 64 self.udp_r, self.udp_w = os.pipe() 65 self.iface_r, self.iface_w = os.pipe() 66 67 try: 68 bus = dbus.SystemBus() 69 pb_proxy = bus.get_object('org.chromium.PermissionBroker', 70 '/org/chromium/PermissionBroker') 71 pb = dbus.Interface(pb_proxy, 'org.chromium.PermissionBroker') 72 73 tcp_lifeline = dbus.types.UnixFd(self.tcp_r) 74 ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "", 75 tcp_lifeline) 76 # |ret| is a dbus.Boolean, but compares as int. 77 if ret == 0: 78 raise error.TestFail("RequestTcpPortAccess returned false.") 79 80 udp_lifeline = dbus.types.UnixFd(self.udp_r) 81 ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "", 82 udp_lifeline) 83 # |ret| is a dbus.Boolean, but compares as int. 84 if ret == 0: 85 raise error.TestFail("RequestUdpPortAccess returned false.") 86 87 iface_lifeline = dbus.types.UnixFd(self.iface_r) 88 ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), 89 dbus.String(self._IFACE), 90 iface_lifeline) 91 # |ret| is a dbus.Boolean, but compares as int. 92 if ret == 0: 93 raise error.TestFail( 94 "RequestTcpPortAccess(port, interface) returned false.") 95 96 # Test IPv4 and IPv6. 97 for executable in ["iptables", "ip6tables"]: 98 actual_rules = self._iptables_rules(executable) 99 self._check_included( 100 self._TCP_RULE, actual_rules, 101 "RequestTcpPortAccess did not add %s rule.", 102 executable) 103 self._check_included( 104 self._UDP_RULE, actual_rules, 105 "RequestUdpPortAccess did not add %s rule.", 106 executable) 107 self._check_included( 108 self._IFACE_RULE, actual_rules, 109 "RequestTcpPortAccess(port, interface)" 110 " did not add %s rule.", 111 executable) 112 113 ret = pb.ReleaseTcpPort(dbus.UInt16(self._PORT), "") 114 # |ret| is a dbus.Boolean, but compares as int. 115 if ret == 0: 116 raise error.TestFail("ReleaseTcpPort returned false.") 117 118 ret = pb.ReleaseUdpPort(dbus.UInt16(self._PORT), "") 119 # |ret| is a dbus.Boolean, but compares as int. 120 if ret == 0: 121 raise error.TestFail("ReleaseUdpPort returned false.") 122 123 # Test IPv4 and IPv6. 124 for executable in ["iptables", "ip6tables"]: 125 rules = self._iptables_rules(executable) 126 self._check_not_included( 127 self._TCP_RULE, rules, 128 "ReleaseTcpPortAccess did not remove %s rule.", 129 executable) 130 self._check_not_included( 131 self._UDP_RULE, rules, 132 "ReleaseUdpPortAccess did not remove %s rule.", 133 executable) 134 135 # permission_broker should plug the firewall hole 136 # when the requesting process exits. 137 # Simulate the process exiting by closing |iface_w|. 138 os.close(self.iface_w) 139 140 # permission_broker checks every |_POLL_INTERVAL| seconds 141 # for processes that have exited. 142 # This is ugly, but it's either this or polling /var/log/messages. 143 time.sleep(self._POLL_INTERVAL + 1) 144 # Test IPv4 and IPv6. 145 for executable in ["iptables", "ip6tables"]: 146 rules = self._iptables_rules(executable) 147 self._check_not_included( 148 self._IFACE_RULE, rules, 149 "permission_broker did not remove %s rule.", 150 executable) 151 152 except dbus.DBusException as e: 153 raise error.TestFail("D-Bus error: " + e.get_dbus_message()) 154 155 156 def cleanup(self): 157 # File descriptors could already be closed. 158 try: 159 os.close(self.tcp_w) 160 os.close(self.udp_w) 161 os.close(self.iface_w) 162 except OSError: 163 pass 164 165 # We don't want the cleanup() method to fail, so we ignore exit codes. 166 # This also allows us to clean up iptables rules unconditionally. 167 # The command will fail if the rule has already been deleted, 168 # but it won't fail the test. 169 for executable in ["iptables", "ip6tables"]: 170 cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp", 171 self._PORT) 172 utils.system(cmd, ignore_status=True) 173 cmd = self._IPTABLES_DEL_CMD % (executable, "udp", "udp", 174 self._PORT) 175 utils.system(cmd, ignore_status=True) 176 cmd = self._IPTABLES_DEL_CMD % (executable, "tcp", "tcp", 177 self._PORT) 178 cmd += " -i %s" % self._IFACE 179 utils.system(cmd, ignore_status=True) 180