1"""Unit tests for socket timeout feature.""" 2 3import functools 4import unittest 5from test import support 6 7# This requires the 'network' resource as given on the regrtest command line. 8skip_expected = not support.is_resource_enabled('network') 9 10import time 11import errno 12import socket 13 14 15@functools.lru_cache() 16def resolve_address(host, port): 17 """Resolve an (host, port) to an address. 18 19 We must perform name resolution before timeout tests, otherwise it will be 20 performed by connect(). 21 """ 22 with support.transient_internet(host): 23 return socket.getaddrinfo(host, port, socket.AF_INET, 24 socket.SOCK_STREAM)[0][4] 25 26 27class CreationTestCase(unittest.TestCase): 28 """Test case for socket.gettimeout() and socket.settimeout()""" 29 30 def setUp(self): 31 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 32 33 def tearDown(self): 34 self.sock.close() 35 36 def testObjectCreation(self): 37 # Test Socket creation 38 self.assertEqual(self.sock.gettimeout(), None, 39 "timeout not disabled by default") 40 41 def testFloatReturnValue(self): 42 # Test return value of gettimeout() 43 self.sock.settimeout(7.345) 44 self.assertEqual(self.sock.gettimeout(), 7.345) 45 46 self.sock.settimeout(3) 47 self.assertEqual(self.sock.gettimeout(), 3) 48 49 self.sock.settimeout(None) 50 self.assertEqual(self.sock.gettimeout(), None) 51 52 def testReturnType(self): 53 # Test return type of gettimeout() 54 self.sock.settimeout(1) 55 self.assertEqual(type(self.sock.gettimeout()), type(1.0)) 56 57 self.sock.settimeout(3.9) 58 self.assertEqual(type(self.sock.gettimeout()), type(1.0)) 59 60 def testTypeCheck(self): 61 # Test type checking by settimeout() 62 self.sock.settimeout(0) 63 self.sock.settimeout(0) 64 self.sock.settimeout(0.0) 65 self.sock.settimeout(None) 66 self.assertRaises(TypeError, self.sock.settimeout, "") 67 self.assertRaises(TypeError, self.sock.settimeout, "") 68 self.assertRaises(TypeError, self.sock.settimeout, ()) 69 self.assertRaises(TypeError, self.sock.settimeout, []) 70 self.assertRaises(TypeError, self.sock.settimeout, {}) 71 self.assertRaises(TypeError, self.sock.settimeout, 0j) 72 73 def testRangeCheck(self): 74 # Test range checking by settimeout() 75 self.assertRaises(ValueError, self.sock.settimeout, -1) 76 self.assertRaises(ValueError, self.sock.settimeout, -1) 77 self.assertRaises(ValueError, self.sock.settimeout, -1.0) 78 79 def testTimeoutThenBlocking(self): 80 # Test settimeout() followed by setblocking() 81 self.sock.settimeout(10) 82 self.sock.setblocking(1) 83 self.assertEqual(self.sock.gettimeout(), None) 84 self.sock.setblocking(0) 85 self.assertEqual(self.sock.gettimeout(), 0.0) 86 87 self.sock.settimeout(10) 88 self.sock.setblocking(0) 89 self.assertEqual(self.sock.gettimeout(), 0.0) 90 self.sock.setblocking(1) 91 self.assertEqual(self.sock.gettimeout(), None) 92 93 def testBlockingThenTimeout(self): 94 # Test setblocking() followed by settimeout() 95 self.sock.setblocking(0) 96 self.sock.settimeout(1) 97 self.assertEqual(self.sock.gettimeout(), 1) 98 99 self.sock.setblocking(1) 100 self.sock.settimeout(1) 101 self.assertEqual(self.sock.gettimeout(), 1) 102 103 104class TimeoutTestCase(unittest.TestCase): 105 # There are a number of tests here trying to make sure that an operation 106 # doesn't take too much longer than expected. But competing machine 107 # activity makes it inevitable that such tests will fail at times. 108 # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K 109 # and Win98SE. Boosting it to 2.0 helped a lot, but isn't a real 110 # solution. 111 fuzz = 2.0 112 113 localhost = support.HOST 114 115 def setUp(self): 116 raise NotImplementedError() 117 118 tearDown = setUp 119 120 def _sock_operation(self, count, timeout, method, *args): 121 """ 122 Test the specified socket method. 123 124 The method is run at most `count` times and must raise a socket.timeout 125 within `timeout` + self.fuzz seconds. 126 """ 127 self.sock.settimeout(timeout) 128 method = getattr(self.sock, method) 129 for i in range(count): 130 t1 = time.time() 131 try: 132 method(*args) 133 except socket.timeout as e: 134 delta = time.time() - t1 135 break 136 else: 137 self.fail('socket.timeout was not raised') 138 # These checks should account for timing unprecision 139 self.assertLess(delta, timeout + self.fuzz) 140 self.assertGreater(delta, timeout - 1.0) 141 142 143class TCPTimeoutTestCase(TimeoutTestCase): 144 """TCP test case for socket.socket() timeout functions""" 145 146 def setUp(self): 147 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 148 self.addr_remote = resolve_address('www.python.org.', 80) 149 150 def tearDown(self): 151 self.sock.close() 152 153 def testConnectTimeout(self): 154 # Testing connect timeout is tricky: we need to have IP connectivity 155 # to a host that silently drops our packets. We can't simulate this 156 # from Python because it's a function of the underlying TCP/IP stack. 157 # So, the following Snakebite host has been defined: 158 blackhole = resolve_address('blackhole.snakebite.net', 56666) 159 160 # Blackhole has been configured to silently drop any incoming packets. 161 # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back 162 # to hosts that attempt to connect to this address: which is exactly 163 # what we need to confidently test connect timeout. 164 165 # However, we want to prevent false positives. It's not unreasonable 166 # to expect certain hosts may not be able to reach the blackhole, due 167 # to firewalling or general network configuration. In order to improve 168 # our confidence in testing the blackhole, a corresponding 'whitehole' 169 # has also been set up using one port higher: 170 whitehole = resolve_address('whitehole.snakebite.net', 56667) 171 172 # This address has been configured to immediately drop any incoming 173 # packets as well, but it does it respectfully with regards to the 174 # incoming protocol. RSTs are sent for TCP packets, and ICMP UNREACH 175 # is sent for UDP/ICMP packets. This means our attempts to connect to 176 # it should be met immediately with ECONNREFUSED. The test case has 177 # been structured around this premise: if we get an ECONNREFUSED from 178 # the whitehole, we proceed with testing connect timeout against the 179 # blackhole. If we don't, we skip the test (with a message about not 180 # getting the required RST from the whitehole within the required 181 # timeframe). 182 183 # For the records, the whitehole/blackhole configuration has been set 184 # up using the 'pf' firewall (available on BSDs), using the following: 185 # 186 # ext_if="bge0" 187 # 188 # blackhole_ip="35.8.247.6" 189 # whitehole_ip="35.8.247.6" 190 # blackhole_port="56666" 191 # whitehole_port="56667" 192 # 193 # block return in log quick on $ext_if proto { tcp udp } \ 194 # from any to $whitehole_ip port $whitehole_port 195 # block drop in log quick on $ext_if proto { tcp udp } \ 196 # from any to $blackhole_ip port $blackhole_port 197 # 198 199 skip = True 200 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 201 # Use a timeout of 3 seconds. Why 3? Because it's more than 1, and 202 # less than 5. i.e. no particular reason. Feel free to tweak it if 203 # you feel a different value would be more appropriate. 204 timeout = 3 205 sock.settimeout(timeout) 206 try: 207 sock.connect((whitehole)) 208 except socket.timeout: 209 pass 210 except OSError as err: 211 if err.errno == errno.ECONNREFUSED: 212 skip = False 213 finally: 214 sock.close() 215 del sock 216 217 if skip: 218 self.skipTest( 219 "We didn't receive a connection reset (RST) packet from " 220 "{}:{} within {} seconds, so we're unable to test connect " 221 "timeout against the corresponding {}:{} (which is " 222 "configured to silently drop packets)." 223 .format( 224 whitehole[0], 225 whitehole[1], 226 timeout, 227 blackhole[0], 228 blackhole[1], 229 ) 230 ) 231 232 # All that hard work just to test if connect times out in 0.001s ;-) 233 self.addr_remote = blackhole 234 with support.transient_internet(self.addr_remote[0]): 235 self._sock_operation(1, 0.001, 'connect', self.addr_remote) 236 237 def testRecvTimeout(self): 238 # Test recv() timeout 239 with support.transient_internet(self.addr_remote[0]): 240 self.sock.connect(self.addr_remote) 241 self._sock_operation(1, 1.5, 'recv', 1024) 242 243 def testAcceptTimeout(self): 244 # Test accept() timeout 245 support.bind_port(self.sock, self.localhost) 246 self.sock.listen() 247 self._sock_operation(1, 1.5, 'accept') 248 249 def testSend(self): 250 # Test send() timeout 251 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 252 support.bind_port(serv, self.localhost) 253 serv.listen() 254 self.sock.connect(serv.getsockname()) 255 # Send a lot of data in order to bypass buffering in the TCP stack. 256 self._sock_operation(100, 1.5, 'send', b"X" * 200000) 257 258 def testSendto(self): 259 # Test sendto() timeout 260 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 261 support.bind_port(serv, self.localhost) 262 serv.listen() 263 self.sock.connect(serv.getsockname()) 264 # The address argument is ignored since we already connected. 265 self._sock_operation(100, 1.5, 'sendto', b"X" * 200000, 266 serv.getsockname()) 267 268 def testSendall(self): 269 # Test sendall() timeout 270 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv: 271 support.bind_port(serv, self.localhost) 272 serv.listen() 273 self.sock.connect(serv.getsockname()) 274 # Send a lot of data in order to bypass buffering in the TCP stack. 275 self._sock_operation(100, 1.5, 'sendall', b"X" * 200000) 276 277 278class UDPTimeoutTestCase(TimeoutTestCase): 279 """UDP test case for socket.socket() timeout functions""" 280 281 def setUp(self): 282 self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 283 284 def tearDown(self): 285 self.sock.close() 286 287 def testRecvfromTimeout(self): 288 # Test recvfrom() timeout 289 # Prevent "Address already in use" socket exceptions 290 support.bind_port(self.sock, self.localhost) 291 self._sock_operation(1, 1.5, 'recvfrom', 1024) 292 293 294def test_main(): 295 support.requires('network') 296 support.run_unittest( 297 CreationTestCase, 298 TCPTimeoutTestCase, 299 UDPTimeoutTestCase, 300 ) 301 302if __name__ == "__main__": 303 test_main() 304