1#!/usr/bin/env python 2 3import unittest 4from test import test_support 5from test.test_urllib2 import sanepathname2url 6 7import socket 8import urllib2 9import os 10import sys 11 12TIMEOUT = 60 # seconds 13 14 15def _retry_thrice(func, exc, *args, **kwargs): 16 for i in range(3): 17 try: 18 return func(*args, **kwargs) 19 except exc, last_exc: 20 continue 21 except: 22 raise 23 raise last_exc 24 25def _wrap_with_retry_thrice(func, exc): 26 def wrapped(*args, **kwargs): 27 return _retry_thrice(func, exc, *args, **kwargs) 28 return wrapped 29 30# Connecting to remote hosts is flaky. Make it more robust by retrying 31# the connection several times. 32_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) 33 34 35class AuthTests(unittest.TestCase): 36 """Tests urllib2 authentication features.""" 37 38## Disabled at the moment since there is no page under python.org which 39## could be used to HTTP authentication. 40# 41# def test_basic_auth(self): 42# import httplib 43# 44# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 45# test_hostport = "www.python.org" 46# test_realm = 'Test Realm' 47# test_user = 'test.test_urllib2net' 48# test_password = 'blah' 49# 50# # failure 51# try: 52# _urlopen_with_retry(test_url) 53# except urllib2.HTTPError, exc: 54# self.assertEqual(exc.code, 401) 55# else: 56# self.fail("urlopen() should have failed with 401") 57# 58# # success 59# auth_handler = urllib2.HTTPBasicAuthHandler() 60# auth_handler.add_password(test_realm, test_hostport, 61# test_user, test_password) 62# opener = urllib2.build_opener(auth_handler) 63# f = opener.open('http://localhost/') 64# response = _urlopen_with_retry("http://www.python.org/") 65# 66# # The 'userinfo' URL component is deprecated by RFC 3986 for security 67# # reasons, let's not implement it! (it's already implemented for proxy 68# # specification strings (that is, URLs or authorities specifying a 69# # proxy), so we must keep that) 70# self.assertRaises(httplib.InvalidURL, 71# urllib2.urlopen, "http://evil:thing@example.com") 72 73 74class CloseSocketTest(unittest.TestCase): 75 76 def test_close(self): 77 import httplib 78 79 # calling .close() on urllib2's response objects should close the 80 # underlying socket 81 82 # delve deep into response to fetch socket._socketobject 83 response = _urlopen_with_retry("http://www.python.org/") 84 abused_fileobject = response.fp 85 self.assertTrue(abused_fileobject.__class__ is socket._fileobject) 86 httpresponse = abused_fileobject._sock 87 self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse) 88 fileobject = httpresponse.fp 89 self.assertTrue(fileobject.__class__ is socket._fileobject) 90 91 self.assertTrue(not fileobject.closed) 92 response.close() 93 self.assertTrue(fileobject.closed) 94 95class OtherNetworkTests(unittest.TestCase): 96 def setUp(self): 97 if 0: # for debugging 98 import logging 99 logger = logging.getLogger("test_urllib2net") 100 logger.addHandler(logging.StreamHandler()) 101 102 # XXX The rest of these tests aren't very good -- they don't check much. 103 # They do sometimes catch some major disasters, though. 104 105 def test_ftp(self): 106 urls = [ 107 'ftp://ftp.kernel.org/pub/linux/kernel/README', 108 'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file', 109 #'ftp://ftp.kernel.org/pub/leenox/kernel/test', 110 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' 111 '/research-reports/00README-Legal-Rules-Regs', 112 ] 113 self._test_urls(urls, self._extra_handlers()) 114 115 def test_file(self): 116 TESTFN = test_support.TESTFN 117 f = open(TESTFN, 'w') 118 try: 119 f.write('hi there\n') 120 f.close() 121 urls = [ 122 'file:'+sanepathname2url(os.path.abspath(TESTFN)), 123 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), 124 ] 125 self._test_urls(urls, self._extra_handlers(), retry=True) 126 finally: 127 os.remove(TESTFN) 128 129 # XXX Following test depends on machine configurations that are internal 130 # to CNRI. Need to set up a public server with the right authentication 131 # configuration for test purposes. 132 133## def test_cnri(self): 134## if socket.gethostname() == 'bitdiddle': 135## localhost = 'bitdiddle.cnri.reston.va.us' 136## elif socket.gethostname() == 'bitdiddle.concentric.net': 137## localhost = 'localhost' 138## else: 139## localhost = None 140## if localhost is not None: 141## urls = [ 142## 'file://%s/etc/passwd' % localhost, 143## 'http://%s/simple/' % localhost, 144## 'http://%s/digest/' % localhost, 145## 'http://%s/not/found.h' % localhost, 146## ] 147 148## bauth = HTTPBasicAuthHandler() 149## bauth.add_password('basic_test_realm', localhost, 'jhylton', 150## 'password') 151## dauth = HTTPDigestAuthHandler() 152## dauth.add_password('digest_test_realm', localhost, 'jhylton', 153## 'password') 154 155## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 156 157 def test_urlwithfrag(self): 158 urlwith_frag = "http://docs.python.org/glossary.html#glossary" 159 with test_support.transient_internet(urlwith_frag): 160 req = urllib2.Request(urlwith_frag) 161 res = urllib2.urlopen(req) 162 self.assertEqual(res.geturl(), 163 "http://docs.python.org/glossary.html#glossary") 164 165 def test_fileno(self): 166 req = urllib2.Request("http://www.python.org") 167 opener = urllib2.build_opener() 168 res = opener.open(req) 169 try: 170 res.fileno() 171 except AttributeError: 172 self.fail("HTTPResponse object should return a valid fileno") 173 finally: 174 res.close() 175 176 def test_custom_headers(self): 177 url = "http://www.example.com" 178 with test_support.transient_internet(url): 179 opener = urllib2.build_opener() 180 request = urllib2.Request(url) 181 self.assertFalse(request.header_items()) 182 opener.open(request) 183 self.assertTrue(request.header_items()) 184 self.assertTrue(request.has_header('User-agent')) 185 request.add_header('User-Agent','Test-Agent') 186 opener.open(request) 187 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 188 189 def _test_urls(self, urls, handlers, retry=True): 190 import time 191 import logging 192 debug = logging.getLogger("test_urllib2").debug 193 194 urlopen = urllib2.build_opener(*handlers).open 195 if retry: 196 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) 197 198 for url in urls: 199 if isinstance(url, tuple): 200 url, req, expected_err = url 201 else: 202 req = expected_err = None 203 with test_support.transient_internet(url): 204 debug(url) 205 try: 206 f = urlopen(url, req, TIMEOUT) 207 except EnvironmentError as err: 208 debug(err) 209 if expected_err: 210 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 211 (expected_err, url, req, type(err), err)) 212 self.assertIsInstance(err, expected_err, msg) 213 except urllib2.URLError as err: 214 if isinstance(err[0], socket.timeout): 215 print >>sys.stderr, "<timeout: %s>" % url 216 continue 217 else: 218 raise 219 else: 220 try: 221 with test_support.transient_internet(url): 222 buf = f.read() 223 debug("read %d bytes" % len(buf)) 224 except socket.timeout: 225 print >>sys.stderr, "<timeout: %s>" % url 226 f.close() 227 debug("******** next url coming up...") 228 time.sleep(0.1) 229 230 def _extra_handlers(self): 231 handlers = [] 232 233 cfh = urllib2.CacheFTPHandler() 234 cfh.setTimeout(1) 235 handlers.append(cfh) 236 237 return handlers 238 239 240class TimeoutTest(unittest.TestCase): 241 def test_http_basic(self): 242 self.assertTrue(socket.getdefaulttimeout() is None) 243 url = "http://www.python.org" 244 with test_support.transient_internet(url, timeout=None): 245 u = _urlopen_with_retry(url) 246 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) 247 248 def test_http_default_timeout(self): 249 self.assertTrue(socket.getdefaulttimeout() is None) 250 url = "http://www.python.org" 251 with test_support.transient_internet(url): 252 socket.setdefaulttimeout(60) 253 try: 254 u = _urlopen_with_retry(url) 255 finally: 256 socket.setdefaulttimeout(None) 257 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) 258 259 def test_http_no_timeout(self): 260 self.assertTrue(socket.getdefaulttimeout() is None) 261 url = "http://www.python.org" 262 with test_support.transient_internet(url): 263 socket.setdefaulttimeout(60) 264 try: 265 u = _urlopen_with_retry(url, timeout=None) 266 finally: 267 socket.setdefaulttimeout(None) 268 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) 269 270 def test_http_timeout(self): 271 url = "http://www.python.org" 272 with test_support.transient_internet(url): 273 u = _urlopen_with_retry(url, timeout=120) 274 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) 275 276 FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/" 277 278 def test_ftp_basic(self): 279 self.assertTrue(socket.getdefaulttimeout() is None) 280 with test_support.transient_internet(self.FTP_HOST, timeout=None): 281 u = _urlopen_with_retry(self.FTP_HOST) 282 self.assertTrue(u.fp.fp._sock.gettimeout() is None) 283 284 def test_ftp_default_timeout(self): 285 self.assertTrue(socket.getdefaulttimeout() is None) 286 with test_support.transient_internet(self.FTP_HOST): 287 socket.setdefaulttimeout(60) 288 try: 289 u = _urlopen_with_retry(self.FTP_HOST) 290 finally: 291 socket.setdefaulttimeout(None) 292 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 293 294 def test_ftp_no_timeout(self): 295 self.assertTrue(socket.getdefaulttimeout() is None) 296 with test_support.transient_internet(self.FTP_HOST): 297 socket.setdefaulttimeout(60) 298 try: 299 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 300 finally: 301 socket.setdefaulttimeout(None) 302 self.assertTrue(u.fp.fp._sock.gettimeout() is None) 303 304 def test_ftp_timeout(self): 305 with test_support.transient_internet(self.FTP_HOST): 306 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 307 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 308 309 310def test_main(): 311 test_support.requires("network") 312 test_support.run_unittest(AuthTests, 313 OtherNetworkTests, 314 CloseSocketTest, 315 TimeoutTest, 316 ) 317 318if __name__ == "__main__": 319 test_main() 320