1import unittest 2from test import support 3from test.support import socket_helper 4from test.test_urllib2 import sanepathname2url 5 6import os 7import socket 8import urllib.error 9import urllib.request 10import sys 11 12support.requires("network") 13 14 15def _retry_thrice(func, exc, *args, **kwargs): 16 for i in range(3): 17 try: 18 return func(*args, **kwargs) 19 except exc as e: 20 last_exc = e 21 continue 22 raise last_exc 23 24def _wrap_with_retry_thrice(func, exc): 25 def wrapped(*args, **kwargs): 26 return _retry_thrice(func, exc, *args, **kwargs) 27 return wrapped 28 29# bpo-35411: FTP tests of test_urllib2net randomly fail 30# with "425 Security: Bad IP connecting" on Travis CI 31skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ, 32 'bpo-35411: skip FTP test ' 33 'on Travis CI') 34 35 36# Connecting to remote hosts is flaky. Make it more robust by retrying 37# the connection several times. 38_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 39 urllib.error.URLError) 40 41 42class AuthTests(unittest.TestCase): 43 """Tests urllib2 authentication features.""" 44 45## Disabled at the moment since there is no page under python.org which 46## could be used to HTTP authentication. 47# 48# def test_basic_auth(self): 49# import http.client 50# 51# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 52# test_hostport = "www.python.org" 53# test_realm = 'Test Realm' 54# test_user = 'test.test_urllib2net' 55# test_password = 'blah' 56# 57# # failure 58# try: 59# _urlopen_with_retry(test_url) 60# except urllib2.HTTPError, exc: 61# self.assertEqual(exc.code, 401) 62# else: 63# self.fail("urlopen() should have failed with 401") 64# 65# # success 66# auth_handler = urllib2.HTTPBasicAuthHandler() 67# auth_handler.add_password(test_realm, test_hostport, 68# test_user, test_password) 69# opener = urllib2.build_opener(auth_handler) 70# f = opener.open('http://localhost/') 71# response = _urlopen_with_retry("http://www.python.org/") 72# 73# # The 'userinfo' URL component is deprecated by RFC 3986 for security 74# # reasons, let's not implement it! (it's already implemented for proxy 75# # specification strings (that is, URLs or authorities specifying a 76# # proxy), so we must keep that) 77# self.assertRaises(http.client.InvalidURL, 78# urllib2.urlopen, "http://evil:thing@example.com") 79 80 81class CloseSocketTest(unittest.TestCase): 82 83 def test_close(self): 84 # clear _opener global variable 85 self.addCleanup(urllib.request.urlcleanup) 86 87 # calling .close() on urllib2's response objects should close the 88 # underlying socket 89 url = support.TEST_HTTP_URL 90 with socket_helper.transient_internet(url): 91 response = _urlopen_with_retry(url) 92 sock = response.fp 93 self.assertFalse(sock.closed) 94 response.close() 95 self.assertTrue(sock.closed) 96 97class OtherNetworkTests(unittest.TestCase): 98 def setUp(self): 99 if 0: # for debugging 100 import logging 101 logger = logging.getLogger("test_urllib2net") 102 logger.addHandler(logging.StreamHandler()) 103 104 # XXX The rest of these tests aren't very good -- they don't check much. 105 # They do sometimes catch some major disasters, though. 106 107 @skip_ftp_test_on_travis 108 def test_ftp(self): 109 urls = [ 110 'ftp://www.pythontest.net/README', 111 ('ftp://www.pythontest.net/non-existent-file', 112 None, urllib.error.URLError), 113 ] 114 self._test_urls(urls, self._extra_handlers()) 115 116 def test_file(self): 117 TESTFN = support.TESTFN 118 f = open(TESTFN, 'w') 119 try: 120 f.write('hi there\n') 121 f.close() 122 urls = [ 123 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 124 ('file:///nonsensename/etc/passwd', None, 125 urllib.error.URLError), 126 ] 127 self._test_urls(urls, self._extra_handlers(), retry=True) 128 finally: 129 os.remove(TESTFN) 130 131 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 132 133 # XXX Following test depends on machine configurations that are internal 134 # to CNRI. Need to set up a public server with the right authentication 135 # configuration for test purposes. 136 137## def test_cnri(self): 138## if socket.gethostname() == 'bitdiddle': 139## localhost = 'bitdiddle.cnri.reston.va.us' 140## elif socket.gethostname() == 'bitdiddle.concentric.net': 141## localhost = 'localhost' 142## else: 143## localhost = None 144## if localhost is not None: 145## urls = [ 146## 'file://%s/etc/passwd' % localhost, 147## 'http://%s/simple/' % localhost, 148## 'http://%s/digest/' % localhost, 149## 'http://%s/not/found.h' % localhost, 150## ] 151 152## bauth = HTTPBasicAuthHandler() 153## bauth.add_password('basic_test_realm', localhost, 'jhylton', 154## 'password') 155## dauth = HTTPDigestAuthHandler() 156## dauth.add_password('digest_test_realm', localhost, 'jhylton', 157## 'password') 158 159## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 160 161 def test_urlwithfrag(self): 162 urlwith_frag = "http://www.pythontest.net/index.html#frag" 163 with socket_helper.transient_internet(urlwith_frag): 164 req = urllib.request.Request(urlwith_frag) 165 res = urllib.request.urlopen(req) 166 self.assertEqual(res.geturl(), 167 "http://www.pythontest.net/index.html#frag") 168 169 def test_redirect_url_withfrag(self): 170 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 171 with socket_helper.transient_internet(redirect_url_with_frag): 172 req = urllib.request.Request(redirect_url_with_frag) 173 res = urllib.request.urlopen(req) 174 self.assertEqual(res.geturl(), 175 "http://www.pythontest.net/elsewhere/#frag") 176 177 def test_custom_headers(self): 178 url = support.TEST_HTTP_URL 179 with socket_helper.transient_internet(url): 180 opener = urllib.request.build_opener() 181 request = urllib.request.Request(url) 182 self.assertFalse(request.header_items()) 183 opener.open(request) 184 self.assertTrue(request.header_items()) 185 self.assertTrue(request.has_header('User-agent')) 186 request.add_header('User-Agent','Test-Agent') 187 opener.open(request) 188 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 189 190 @unittest.skip('XXX: http://www.imdb.com is gone') 191 def test_sites_no_connection_close(self): 192 # Some sites do not send Connection: close header. 193 # Verify that those work properly. (#issue12576) 194 195 URL = 'http://www.imdb.com' # mangles Connection:close 196 197 with socket_helper.transient_internet(URL): 198 try: 199 with urllib.request.urlopen(URL) as res: 200 pass 201 except ValueError: 202 self.fail("urlopen failed for site not sending \ 203 Connection:close") 204 else: 205 self.assertTrue(res) 206 207 req = urllib.request.urlopen(URL) 208 res = req.read() 209 self.assertTrue(res) 210 211 def _test_urls(self, urls, handlers, retry=True): 212 import time 213 import logging 214 debug = logging.getLogger("test_urllib2").debug 215 216 urlopen = urllib.request.build_opener(*handlers).open 217 if retry: 218 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 219 220 for url in urls: 221 with self.subTest(url=url): 222 if isinstance(url, tuple): 223 url, req, expected_err = url 224 else: 225 req = expected_err = None 226 227 with socket_helper.transient_internet(url): 228 try: 229 f = urlopen(url, req, support.INTERNET_TIMEOUT) 230 # urllib.error.URLError is a subclass of OSError 231 except OSError as err: 232 if expected_err: 233 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 234 (expected_err, url, req, type(err), err)) 235 self.assertIsInstance(err, expected_err, msg) 236 else: 237 raise 238 else: 239 try: 240 with support.time_out, \ 241 support.socket_peer_reset, \ 242 support.ioerror_peer_reset: 243 buf = f.read() 244 debug("read %d bytes" % len(buf)) 245 except socket.timeout: 246 print("<timeout: %s>" % url, file=sys.stderr) 247 f.close() 248 time.sleep(0.1) 249 250 def _extra_handlers(self): 251 handlers = [] 252 253 cfh = urllib.request.CacheFTPHandler() 254 self.addCleanup(cfh.clear_cache) 255 cfh.setTimeout(1) 256 handlers.append(cfh) 257 258 return handlers 259 260 261class TimeoutTest(unittest.TestCase): 262 def setUp(self): 263 # clear _opener global variable 264 self.addCleanup(urllib.request.urlcleanup) 265 266 def test_http_basic(self): 267 self.assertIsNone(socket.getdefaulttimeout()) 268 url = support.TEST_HTTP_URL 269 with socket_helper.transient_internet(url, timeout=None): 270 u = _urlopen_with_retry(url) 271 self.addCleanup(u.close) 272 self.assertIsNone(u.fp.raw._sock.gettimeout()) 273 274 def test_http_default_timeout(self): 275 self.assertIsNone(socket.getdefaulttimeout()) 276 url = support.TEST_HTTP_URL 277 with socket_helper.transient_internet(url): 278 socket.setdefaulttimeout(60) 279 try: 280 u = _urlopen_with_retry(url) 281 self.addCleanup(u.close) 282 finally: 283 socket.setdefaulttimeout(None) 284 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 285 286 def test_http_no_timeout(self): 287 self.assertIsNone(socket.getdefaulttimeout()) 288 url = support.TEST_HTTP_URL 289 with socket_helper.transient_internet(url): 290 socket.setdefaulttimeout(60) 291 try: 292 u = _urlopen_with_retry(url, timeout=None) 293 self.addCleanup(u.close) 294 finally: 295 socket.setdefaulttimeout(None) 296 self.assertIsNone(u.fp.raw._sock.gettimeout()) 297 298 def test_http_timeout(self): 299 url = support.TEST_HTTP_URL 300 with socket_helper.transient_internet(url): 301 u = _urlopen_with_retry(url, timeout=120) 302 self.addCleanup(u.close) 303 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 304 305 FTP_HOST = 'ftp://www.pythontest.net/' 306 307 @skip_ftp_test_on_travis 308 def test_ftp_basic(self): 309 self.assertIsNone(socket.getdefaulttimeout()) 310 with socket_helper.transient_internet(self.FTP_HOST, timeout=None): 311 u = _urlopen_with_retry(self.FTP_HOST) 312 self.addCleanup(u.close) 313 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 314 315 @skip_ftp_test_on_travis 316 def test_ftp_default_timeout(self): 317 self.assertIsNone(socket.getdefaulttimeout()) 318 with socket_helper.transient_internet(self.FTP_HOST): 319 socket.setdefaulttimeout(60) 320 try: 321 u = _urlopen_with_retry(self.FTP_HOST) 322 self.addCleanup(u.close) 323 finally: 324 socket.setdefaulttimeout(None) 325 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 326 327 @skip_ftp_test_on_travis 328 def test_ftp_no_timeout(self): 329 self.assertIsNone(socket.getdefaulttimeout()) 330 with socket_helper.transient_internet(self.FTP_HOST): 331 socket.setdefaulttimeout(60) 332 try: 333 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 334 self.addCleanup(u.close) 335 finally: 336 socket.setdefaulttimeout(None) 337 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 338 339 @skip_ftp_test_on_travis 340 def test_ftp_timeout(self): 341 with socket_helper.transient_internet(self.FTP_HOST): 342 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 343 self.addCleanup(u.close) 344 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 345 346 347if __name__ == "__main__": 348 unittest.main() 349