1"""Unit tests for socket timeout feature."""
2
3import functools
4import unittest
5from test import support
6
7# This requires the 'network' resource as given on the regrtest command line.
8skip_expected = not support.is_resource_enabled('network')
9
10import time
11import errno
12import socket
13
14
15@functools.lru_cache()
16def resolve_address(host, port):
17    """Resolve an (host, port) to an address.
18
19    We must perform name resolution before timeout tests, otherwise it will be
20    performed by connect().
21    """
22    with support.transient_internet(host):
23        return socket.getaddrinfo(host, port, socket.AF_INET,
24                                  socket.SOCK_STREAM)[0][4]
25
26
27class CreationTestCase(unittest.TestCase):
28    """Test case for socket.gettimeout() and socket.settimeout()"""
29
30    def setUp(self):
31        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32
33    def tearDown(self):
34        self.sock.close()
35
36    def testObjectCreation(self):
37        # Test Socket creation
38        self.assertEqual(self.sock.gettimeout(), None,
39                         "timeout not disabled by default")
40
41    def testFloatReturnValue(self):
42        # Test return value of gettimeout()
43        self.sock.settimeout(7.345)
44        self.assertEqual(self.sock.gettimeout(), 7.345)
45
46        self.sock.settimeout(3)
47        self.assertEqual(self.sock.gettimeout(), 3)
48
49        self.sock.settimeout(None)
50        self.assertEqual(self.sock.gettimeout(), None)
51
52    def testReturnType(self):
53        # Test return type of gettimeout()
54        self.sock.settimeout(1)
55        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
56
57        self.sock.settimeout(3.9)
58        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
59
60    def testTypeCheck(self):
61        # Test type checking by settimeout()
62        self.sock.settimeout(0)
63        self.sock.settimeout(0)
64        self.sock.settimeout(0.0)
65        self.sock.settimeout(None)
66        self.assertRaises(TypeError, self.sock.settimeout, "")
67        self.assertRaises(TypeError, self.sock.settimeout, "")
68        self.assertRaises(TypeError, self.sock.settimeout, ())
69        self.assertRaises(TypeError, self.sock.settimeout, [])
70        self.assertRaises(TypeError, self.sock.settimeout, {})
71        self.assertRaises(TypeError, self.sock.settimeout, 0j)
72
73    def testRangeCheck(self):
74        # Test range checking by settimeout()
75        self.assertRaises(ValueError, self.sock.settimeout, -1)
76        self.assertRaises(ValueError, self.sock.settimeout, -1)
77        self.assertRaises(ValueError, self.sock.settimeout, -1.0)
78
79    def testTimeoutThenBlocking(self):
80        # Test settimeout() followed by setblocking()
81        self.sock.settimeout(10)
82        self.sock.setblocking(1)
83        self.assertEqual(self.sock.gettimeout(), None)
84        self.sock.setblocking(0)
85        self.assertEqual(self.sock.gettimeout(), 0.0)
86
87        self.sock.settimeout(10)
88        self.sock.setblocking(0)
89        self.assertEqual(self.sock.gettimeout(), 0.0)
90        self.sock.setblocking(1)
91        self.assertEqual(self.sock.gettimeout(), None)
92
93    def testBlockingThenTimeout(self):
94        # Test setblocking() followed by settimeout()
95        self.sock.setblocking(0)
96        self.sock.settimeout(1)
97        self.assertEqual(self.sock.gettimeout(), 1)
98
99        self.sock.setblocking(1)
100        self.sock.settimeout(1)
101        self.assertEqual(self.sock.gettimeout(), 1)
102
103
104class TimeoutTestCase(unittest.TestCase):
105    # There are a number of tests here trying to make sure that an operation
106    # doesn't take too much longer than expected.  But competing machine
107    # activity makes it inevitable that such tests will fail at times.
108    # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
109    # and Win98SE.  Boosting it to 2.0 helped a lot, but isn't a real
110    # solution.
111    fuzz = 2.0
112
113    localhost = support.HOST
114
115    def setUp(self):
116        raise NotImplementedError()
117
118    tearDown = setUp
119
120    def _sock_operation(self, count, timeout, method, *args):
121        """
122        Test the specified socket method.
123
124        The method is run at most `count` times and must raise a socket.timeout
125        within `timeout` + self.fuzz seconds.
126        """
127        self.sock.settimeout(timeout)
128        method = getattr(self.sock, method)
129        for i in range(count):
130            t1 = time.time()
131            try:
132                method(*args)
133            except socket.timeout as e:
134                delta = time.time() - t1
135                break
136        else:
137            self.fail('socket.timeout was not raised')
138        # These checks should account for timing unprecision
139        self.assertLess(delta, timeout + self.fuzz)
140        self.assertGreater(delta, timeout - 1.0)
141
142
143class TCPTimeoutTestCase(TimeoutTestCase):
144    """TCP test case for socket.socket() timeout functions"""
145
146    def setUp(self):
147        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
148        self.addr_remote = resolve_address('www.python.org.', 80)
149
150    def tearDown(self):
151        self.sock.close()
152
153    def testConnectTimeout(self):
154        # Testing connect timeout is tricky: we need to have IP connectivity
155        # to a host that silently drops our packets.  We can't simulate this
156        # from Python because it's a function of the underlying TCP/IP stack.
157        # So, the following Snakebite host has been defined:
158        blackhole = resolve_address('blackhole.snakebite.net', 56666)
159
160        # Blackhole has been configured to silently drop any incoming packets.
161        # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back
162        # to hosts that attempt to connect to this address: which is exactly
163        # what we need to confidently test connect timeout.
164
165        # However, we want to prevent false positives.  It's not unreasonable
166        # to expect certain hosts may not be able to reach the blackhole, due
167        # to firewalling or general network configuration.  In order to improve
168        # our confidence in testing the blackhole, a corresponding 'whitehole'
169        # has also been set up using one port higher:
170        whitehole = resolve_address('whitehole.snakebite.net', 56667)
171
172        # This address has been configured to immediately drop any incoming
173        # packets as well, but it does it respectfully with regards to the
174        # incoming protocol.  RSTs are sent for TCP packets, and ICMP UNREACH
175        # is sent for UDP/ICMP packets.  This means our attempts to connect to
176        # it should be met immediately with ECONNREFUSED.  The test case has
177        # been structured around this premise: if we get an ECONNREFUSED from
178        # the whitehole, we proceed with testing connect timeout against the
179        # blackhole.  If we don't, we skip the test (with a message about not
180        # getting the required RST from the whitehole within the required
181        # timeframe).
182
183        # For the records, the whitehole/blackhole configuration has been set
184        # up using the 'pf' firewall (available on BSDs), using the following:
185        #
186        #   ext_if="bge0"
187        #
188        #   blackhole_ip="35.8.247.6"
189        #   whitehole_ip="35.8.247.6"
190        #   blackhole_port="56666"
191        #   whitehole_port="56667"
192        #
193        #   block return in log quick on $ext_if proto { tcp udp } \
194        #       from any to $whitehole_ip port $whitehole_port
195        #   block drop in log quick on $ext_if proto { tcp udp } \
196        #       from any to $blackhole_ip port $blackhole_port
197        #
198
199        skip = True
200        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
201        # Use a timeout of 3 seconds.  Why 3?  Because it's more than 1, and
202        # less than 5.  i.e. no particular reason.  Feel free to tweak it if
203        # you feel a different value would be more appropriate.
204        timeout = 3
205        sock.settimeout(timeout)
206        try:
207            sock.connect((whitehole))
208        except socket.timeout:
209            pass
210        except OSError as err:
211            if err.errno == errno.ECONNREFUSED:
212                skip = False
213        finally:
214            sock.close()
215            del sock
216
217        if skip:
218            self.skipTest(
219                "We didn't receive a connection reset (RST) packet from "
220                "{}:{} within {} seconds, so we're unable to test connect "
221                "timeout against the corresponding {}:{} (which is "
222                "configured to silently drop packets)."
223                    .format(
224                        whitehole[0],
225                        whitehole[1],
226                        timeout,
227                        blackhole[0],
228                        blackhole[1],
229                    )
230            )
231
232        # All that hard work just to test if connect times out in 0.001s ;-)
233        self.addr_remote = blackhole
234        with support.transient_internet(self.addr_remote[0]):
235            self._sock_operation(1, 0.001, 'connect', self.addr_remote)
236
237    def testRecvTimeout(self):
238        # Test recv() timeout
239        with support.transient_internet(self.addr_remote[0]):
240            self.sock.connect(self.addr_remote)
241            self._sock_operation(1, 1.5, 'recv', 1024)
242
243    def testAcceptTimeout(self):
244        # Test accept() timeout
245        support.bind_port(self.sock, self.localhost)
246        self.sock.listen()
247        self._sock_operation(1, 1.5, 'accept')
248
249    def testSend(self):
250        # Test send() timeout
251        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
252            support.bind_port(serv, self.localhost)
253            serv.listen()
254            self.sock.connect(serv.getsockname())
255            # Send a lot of data in order to bypass buffering in the TCP stack.
256            self._sock_operation(100, 1.5, 'send', b"X" * 200000)
257
258    def testSendto(self):
259        # Test sendto() timeout
260        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
261            support.bind_port(serv, self.localhost)
262            serv.listen()
263            self.sock.connect(serv.getsockname())
264            # The address argument is ignored since we already connected.
265            self._sock_operation(100, 1.5, 'sendto', b"X" * 200000,
266                                 serv.getsockname())
267
268    def testSendall(self):
269        # Test sendall() timeout
270        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
271            support.bind_port(serv, self.localhost)
272            serv.listen()
273            self.sock.connect(serv.getsockname())
274            # Send a lot of data in order to bypass buffering in the TCP stack.
275            self._sock_operation(100, 1.5, 'sendall', b"X" * 200000)
276
277
278class UDPTimeoutTestCase(TimeoutTestCase):
279    """UDP test case for socket.socket() timeout functions"""
280
281    def setUp(self):
282        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
283
284    def tearDown(self):
285        self.sock.close()
286
287    def testRecvfromTimeout(self):
288        # Test recvfrom() timeout
289        # Prevent "Address already in use" socket exceptions
290        support.bind_port(self.sock, self.localhost)
291        self._sock_operation(1, 1.5, 'recvfrom', 1024)
292
293
294def test_main():
295    support.requires('network')
296    support.run_unittest(
297        CreationTestCase,
298        TCPTimeoutTestCase,
299        UDPTimeoutTestCase,
300    )
301
302if __name__ == "__main__":
303    test_main()
304