1import unittest
2from test import test_support
3from test.test_urllib2 import sanepathname2url
4
5import socket
6import urllib2
7import os
8import sys
9
10TIMEOUT = 60  # seconds
11
12
13def _retry_thrice(func, exc, *args, **kwargs):
14    for i in range(3):
15        try:
16            return func(*args, **kwargs)
17        except exc, last_exc:
18            continue
19        except:
20            raise
21    raise last_exc
22
23def _wrap_with_retry_thrice(func, exc):
24    def wrapped(*args, **kwargs):
25        return _retry_thrice(func, exc, *args, **kwargs)
26    return wrapped
27
28# Connecting to remote hosts is flaky.  Make it more robust by retrying
29# the connection several times.
30_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
31
32
33class AuthTests(unittest.TestCase):
34    """Tests urllib2 authentication features."""
35
36## Disabled at the moment since there is no page under python.org which
37## could be used to HTTP authentication.
38#
39#    def test_basic_auth(self):
40#        import httplib
41#
42#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
43#        test_hostport = "www.python.org"
44#        test_realm = 'Test Realm'
45#        test_user = 'test.test_urllib2net'
46#        test_password = 'blah'
47#
48#        # failure
49#        try:
50#            _urlopen_with_retry(test_url)
51#        except urllib2.HTTPError, exc:
52#            self.assertEqual(exc.code, 401)
53#        else:
54#            self.fail("urlopen() should have failed with 401")
55#
56#        # success
57#        auth_handler = urllib2.HTTPBasicAuthHandler()
58#        auth_handler.add_password(test_realm, test_hostport,
59#                                  test_user, test_password)
60#        opener = urllib2.build_opener(auth_handler)
61#        f = opener.open('http://localhost/')
62#        response = _urlopen_with_retry("http://www.python.org/")
63#
64#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
65#        # reasons, let's not implement it!  (it's already implemented for proxy
66#        # specification strings (that is, URLs or authorities specifying a
67#        # proxy), so we must keep that)
68#        self.assertRaises(httplib.InvalidURL,
69#                          urllib2.urlopen, "http://evil:thing@example.com")
70
71
72class CloseSocketTest(unittest.TestCase):
73
74    def test_close(self):
75        import httplib
76
77        # calling .close() on urllib2's response objects should close the
78        # underlying socket
79
80        # delve deep into response to fetch socket._socketobject
81        response = _urlopen_with_retry("http://www.example.com/")
82        abused_fileobject = response.fp
83        self.assertIs(abused_fileobject.__class__, socket._fileobject)
84        httpresponse = abused_fileobject._sock
85        self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
86        fileobject = httpresponse.fp
87        self.assertIs(fileobject.__class__, socket._fileobject)
88
89        self.assertTrue(not fileobject.closed)
90        response.close()
91        self.assertTrue(fileobject.closed)
92
93class OtherNetworkTests(unittest.TestCase):
94    def setUp(self):
95        if 0:  # for debugging
96            import logging
97            logger = logging.getLogger("test_urllib2net")
98            logger.addHandler(logging.StreamHandler())
99
100    # XXX The rest of these tests aren't very good -- they don't check much.
101    # They do sometimes catch some major disasters, though.
102
103    def test_ftp(self):
104        urls = [
105            'ftp://www.pythontest.net/README',
106            ('ftp://www.pythontest.net/non-existent-file',
107             None, urllib2.URLError),
108            ]
109        self._test_urls(urls, self._extra_handlers())
110
111    def test_file(self):
112        TESTFN = test_support.TESTFN
113        f = open(TESTFN, 'w')
114        try:
115            f.write('hi there\n')
116            f.close()
117            urls = [
118                'file:'+sanepathname2url(os.path.abspath(TESTFN)),
119                ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
120                ]
121            self._test_urls(urls, self._extra_handlers(), retry=True)
122        finally:
123            os.remove(TESTFN)
124
125        self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
126
127    # XXX Following test depends on machine configurations that are internal
128    # to CNRI.  Need to set up a public server with the right authentication
129    # configuration for test purposes.
130
131##     def test_cnri(self):
132##         if socket.gethostname() == 'bitdiddle':
133##             localhost = 'bitdiddle.cnri.reston.va.us'
134##         elif socket.gethostname() == 'bitdiddle.concentric.net':
135##             localhost = 'localhost'
136##         else:
137##             localhost = None
138##         if localhost is not None:
139##             urls = [
140##                 'file://%s/etc/passwd' % localhost,
141##                 'http://%s/simple/' % localhost,
142##                 'http://%s/digest/' % localhost,
143##                 'http://%s/not/found.h' % localhost,
144##                 ]
145
146##             bauth = HTTPBasicAuthHandler()
147##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
148##                                'password')
149##             dauth = HTTPDigestAuthHandler()
150##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
151##                                'password')
152
153##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
154
155    def test_urlwithfrag(self):
156        urlwith_frag = "http://www.pythontest.net/index.html#frag"
157        with test_support.transient_internet(urlwith_frag):
158            req = urllib2.Request(urlwith_frag)
159            res = urllib2.urlopen(req)
160            self.assertEqual(res.geturl(),
161                    "http://www.pythontest.net/index.html#frag")
162
163    def test_fileno(self):
164        req = urllib2.Request("http://www.example.com")
165        opener = urllib2.build_opener()
166        res = opener.open(req)
167        try:
168            res.fileno()
169        except AttributeError:
170            self.fail("HTTPResponse object should return a valid fileno")
171        finally:
172            res.close()
173
174    def test_custom_headers(self):
175        url = "http://www.example.com"
176        with test_support.transient_internet(url):
177            opener = urllib2.build_opener()
178            request = urllib2.Request(url)
179            self.assertFalse(request.header_items())
180            opener.open(request)
181            self.assertTrue(request.header_items())
182            self.assertTrue(request.has_header('User-agent'))
183            request.add_header('User-Agent','Test-Agent')
184            opener.open(request)
185            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
186
187    @unittest.skip('XXX: http://www.imdb.com is gone')
188    def test_sites_no_connection_close(self):
189        # Some sites do not send Connection: close header.
190        # Verify that those work properly. (#issue12576)
191
192        URL = 'http://www.imdb.com' # No Connection:close
193        with test_support.transient_internet(URL):
194            req = urllib2.urlopen(URL)
195            res = req.read()
196            self.assertTrue(res)
197
198    def _test_urls(self, urls, handlers, retry=True):
199        import time
200        import logging
201        debug = logging.getLogger("test_urllib2").debug
202
203        urlopen = urllib2.build_opener(*handlers).open
204        if retry:
205            urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
206
207        for url in urls:
208            if isinstance(url, tuple):
209                url, req, expected_err = url
210            else:
211                req = expected_err = None
212            with test_support.transient_internet(url):
213                debug(url)
214                try:
215                    f = urlopen(url, req, TIMEOUT)
216                except EnvironmentError as err:
217                    debug(err)
218                    if expected_err:
219                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
220                               (expected_err, url, req, type(err), err))
221                        self.assertIsInstance(err, expected_err, msg)
222                except urllib2.URLError as err:
223                    if isinstance(err[0], socket.timeout):
224                        print >>sys.stderr, "<timeout: %s>" % url
225                        continue
226                    else:
227                        raise
228                else:
229                    try:
230                        with test_support.transient_internet(url):
231                            buf = f.read()
232                            debug("read %d bytes" % len(buf))
233                    except socket.timeout:
234                        print >>sys.stderr, "<timeout: %s>" % url
235                    f.close()
236            debug("******** next url coming up...")
237            time.sleep(0.1)
238
239    def _extra_handlers(self):
240        handlers = []
241
242        cfh = urllib2.CacheFTPHandler()
243        self.addCleanup(cfh.clear_cache)
244        cfh.setTimeout(1)
245        handlers.append(cfh)
246
247        return handlers
248
249
250class TimeoutTest(unittest.TestCase):
251    def test_http_basic(self):
252        self.assertIsNone(socket.getdefaulttimeout())
253        url = "http://www.example.com"
254        with test_support.transient_internet(url, timeout=None):
255            u = _urlopen_with_retry(url)
256            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
257
258    def test_http_default_timeout(self):
259        self.assertIsNone(socket.getdefaulttimeout())
260        url = "http://www.example.com"
261        with test_support.transient_internet(url):
262            socket.setdefaulttimeout(60)
263            try:
264                u = _urlopen_with_retry(url)
265            finally:
266                socket.setdefaulttimeout(None)
267            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
268
269    def test_http_no_timeout(self):
270        self.assertIsNone(socket.getdefaulttimeout())
271        url = "http://www.example.com"
272        with test_support.transient_internet(url):
273            socket.setdefaulttimeout(60)
274            try:
275                u = _urlopen_with_retry(url, timeout=None)
276            finally:
277                socket.setdefaulttimeout(None)
278            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
279
280    def test_http_timeout(self):
281        url = "http://www.example.com"
282        with test_support.transient_internet(url):
283            u = _urlopen_with_retry(url, timeout=120)
284            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
285
286    FTP_HOST = 'ftp://www.pythontest.net/'
287
288    def test_ftp_basic(self):
289        self.assertIsNone(socket.getdefaulttimeout())
290        with test_support.transient_internet(self.FTP_HOST, timeout=None):
291            u = _urlopen_with_retry(self.FTP_HOST)
292            self.assertIsNone(u.fp.fp._sock.gettimeout())
293
294    def test_ftp_default_timeout(self):
295        self.assertIsNone(socket.getdefaulttimeout())
296        with test_support.transient_internet(self.FTP_HOST):
297            socket.setdefaulttimeout(60)
298            try:
299                u = _urlopen_with_retry(self.FTP_HOST)
300            finally:
301                socket.setdefaulttimeout(None)
302            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
303
304    def test_ftp_no_timeout(self):
305        self.assertIsNone(socket.getdefaulttimeout(),)
306        with test_support.transient_internet(self.FTP_HOST):
307            socket.setdefaulttimeout(60)
308            try:
309                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
310            finally:
311                socket.setdefaulttimeout(None)
312            self.assertIsNone(u.fp.fp._sock.gettimeout())
313
314    def test_ftp_timeout(self):
315        with test_support.transient_internet(self.FTP_HOST):
316            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
317            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
318
319
320def test_main():
321    test_support.requires("network")
322    test_support.run_unittest(AuthTests,
323                              OtherNetworkTests,
324                              CloseSocketTest,
325                              TimeoutTest,
326                              )
327
328if __name__ == "__main__":
329    test_main()
330