1#!/usr/bin/env python
2
3import unittest
4from test import test_support
5from test.test_urllib2 import sanepathname2url
6
7import socket
8import urllib2
9import os
10import sys
11
12TIMEOUT = 60  # seconds
13
14
15def _retry_thrice(func, exc, *args, **kwargs):
16    for i in range(3):
17        try:
18            return func(*args, **kwargs)
19        except exc, last_exc:
20            continue
21        except:
22            raise
23    raise last_exc
24
25def _wrap_with_retry_thrice(func, exc):
26    def wrapped(*args, **kwargs):
27        return _retry_thrice(func, exc, *args, **kwargs)
28    return wrapped
29
30# Connecting to remote hosts is flaky.  Make it more robust by retrying
31# the connection several times.
32_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
33
34
35class AuthTests(unittest.TestCase):
36    """Tests urllib2 authentication features."""
37
38## Disabled at the moment since there is no page under python.org which
39## could be used to HTTP authentication.
40#
41#    def test_basic_auth(self):
42#        import httplib
43#
44#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
45#        test_hostport = "www.python.org"
46#        test_realm = 'Test Realm'
47#        test_user = 'test.test_urllib2net'
48#        test_password = 'blah'
49#
50#        # failure
51#        try:
52#            _urlopen_with_retry(test_url)
53#        except urllib2.HTTPError, exc:
54#            self.assertEqual(exc.code, 401)
55#        else:
56#            self.fail("urlopen() should have failed with 401")
57#
58#        # success
59#        auth_handler = urllib2.HTTPBasicAuthHandler()
60#        auth_handler.add_password(test_realm, test_hostport,
61#                                  test_user, test_password)
62#        opener = urllib2.build_opener(auth_handler)
63#        f = opener.open('http://localhost/')
64#        response = _urlopen_with_retry("http://www.python.org/")
65#
66#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
67#        # reasons, let's not implement it!  (it's already implemented for proxy
68#        # specification strings (that is, URLs or authorities specifying a
69#        # proxy), so we must keep that)
70#        self.assertRaises(httplib.InvalidURL,
71#                          urllib2.urlopen, "http://evil:thing@example.com")
72
73
74class CloseSocketTest(unittest.TestCase):
75
76    def test_close(self):
77        import httplib
78
79        # calling .close() on urllib2's response objects should close the
80        # underlying socket
81
82        # delve deep into response to fetch socket._socketobject
83        response = _urlopen_with_retry("http://www.python.org/")
84        abused_fileobject = response.fp
85        self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
86        httpresponse = abused_fileobject._sock
87        self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
88        fileobject = httpresponse.fp
89        self.assertTrue(fileobject.__class__ is socket._fileobject)
90
91        self.assertTrue(not fileobject.closed)
92        response.close()
93        self.assertTrue(fileobject.closed)
94
95class OtherNetworkTests(unittest.TestCase):
96    def setUp(self):
97        if 0:  # for debugging
98            import logging
99            logger = logging.getLogger("test_urllib2net")
100            logger.addHandler(logging.StreamHandler())
101
102    # XXX The rest of these tests aren't very good -- they don't check much.
103    # They do sometimes catch some major disasters, though.
104
105    def test_ftp(self):
106        urls = [
107            'ftp://ftp.kernel.org/pub/linux/kernel/README',
108            'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
109            #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
110            'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
111                '/research-reports/00README-Legal-Rules-Regs',
112            ]
113        self._test_urls(urls, self._extra_handlers())
114
115    def test_file(self):
116        TESTFN = test_support.TESTFN
117        f = open(TESTFN, 'w')
118        try:
119            f.write('hi there\n')
120            f.close()
121            urls = [
122                'file:'+sanepathname2url(os.path.abspath(TESTFN)),
123                ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
124                ]
125            self._test_urls(urls, self._extra_handlers(), retry=True)
126        finally:
127            os.remove(TESTFN)
128
129    # XXX Following test depends on machine configurations that are internal
130    # to CNRI.  Need to set up a public server with the right authentication
131    # configuration for test purposes.
132
133##     def test_cnri(self):
134##         if socket.gethostname() == 'bitdiddle':
135##             localhost = 'bitdiddle.cnri.reston.va.us'
136##         elif socket.gethostname() == 'bitdiddle.concentric.net':
137##             localhost = 'localhost'
138##         else:
139##             localhost = None
140##         if localhost is not None:
141##             urls = [
142##                 'file://%s/etc/passwd' % localhost,
143##                 'http://%s/simple/' % localhost,
144##                 'http://%s/digest/' % localhost,
145##                 'http://%s/not/found.h' % localhost,
146##                 ]
147
148##             bauth = HTTPBasicAuthHandler()
149##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
150##                                'password')
151##             dauth = HTTPDigestAuthHandler()
152##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
153##                                'password')
154
155##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
156
157    def test_urlwithfrag(self):
158        urlwith_frag = "http://docs.python.org/glossary.html#glossary"
159        with test_support.transient_internet(urlwith_frag):
160            req = urllib2.Request(urlwith_frag)
161            res = urllib2.urlopen(req)
162            self.assertEqual(res.geturl(),
163                    "http://docs.python.org/glossary.html#glossary")
164
165    def test_fileno(self):
166        req = urllib2.Request("http://www.python.org")
167        opener = urllib2.build_opener()
168        res = opener.open(req)
169        try:
170            res.fileno()
171        except AttributeError:
172            self.fail("HTTPResponse object should return a valid fileno")
173        finally:
174            res.close()
175
176    def test_custom_headers(self):
177        url = "http://www.example.com"
178        with test_support.transient_internet(url):
179            opener = urllib2.build_opener()
180            request = urllib2.Request(url)
181            self.assertFalse(request.header_items())
182            opener.open(request)
183            self.assertTrue(request.header_items())
184            self.assertTrue(request.has_header('User-agent'))
185            request.add_header('User-Agent','Test-Agent')
186            opener.open(request)
187            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
188
189    def _test_urls(self, urls, handlers, retry=True):
190        import time
191        import logging
192        debug = logging.getLogger("test_urllib2").debug
193
194        urlopen = urllib2.build_opener(*handlers).open
195        if retry:
196            urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
197
198        for url in urls:
199            if isinstance(url, tuple):
200                url, req, expected_err = url
201            else:
202                req = expected_err = None
203            with test_support.transient_internet(url):
204                debug(url)
205                try:
206                    f = urlopen(url, req, TIMEOUT)
207                except EnvironmentError as err:
208                    debug(err)
209                    if expected_err:
210                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
211                               (expected_err, url, req, type(err), err))
212                        self.assertIsInstance(err, expected_err, msg)
213                except urllib2.URLError as err:
214                    if isinstance(err[0], socket.timeout):
215                        print >>sys.stderr, "<timeout: %s>" % url
216                        continue
217                    else:
218                        raise
219                else:
220                    try:
221                        with test_support.transient_internet(url):
222                            buf = f.read()
223                            debug("read %d bytes" % len(buf))
224                    except socket.timeout:
225                        print >>sys.stderr, "<timeout: %s>" % url
226                    f.close()
227            debug("******** next url coming up...")
228            time.sleep(0.1)
229
230    def _extra_handlers(self):
231        handlers = []
232
233        cfh = urllib2.CacheFTPHandler()
234        cfh.setTimeout(1)
235        handlers.append(cfh)
236
237        return handlers
238
239
240class TimeoutTest(unittest.TestCase):
241    def test_http_basic(self):
242        self.assertTrue(socket.getdefaulttimeout() is None)
243        url = "http://www.python.org"
244        with test_support.transient_internet(url, timeout=None):
245            u = _urlopen_with_retry(url)
246            self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
247
248    def test_http_default_timeout(self):
249        self.assertTrue(socket.getdefaulttimeout() is None)
250        url = "http://www.python.org"
251        with test_support.transient_internet(url):
252            socket.setdefaulttimeout(60)
253            try:
254                u = _urlopen_with_retry(url)
255            finally:
256                socket.setdefaulttimeout(None)
257            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
258
259    def test_http_no_timeout(self):
260        self.assertTrue(socket.getdefaulttimeout() is None)
261        url = "http://www.python.org"
262        with test_support.transient_internet(url):
263            socket.setdefaulttimeout(60)
264            try:
265                u = _urlopen_with_retry(url, timeout=None)
266            finally:
267                socket.setdefaulttimeout(None)
268            self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
269
270    def test_http_timeout(self):
271        url = "http://www.python.org"
272        with test_support.transient_internet(url):
273            u = _urlopen_with_retry(url, timeout=120)
274            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
275
276    FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
277
278    def test_ftp_basic(self):
279        self.assertTrue(socket.getdefaulttimeout() is None)
280        with test_support.transient_internet(self.FTP_HOST, timeout=None):
281            u = _urlopen_with_retry(self.FTP_HOST)
282            self.assertTrue(u.fp.fp._sock.gettimeout() is None)
283
284    def test_ftp_default_timeout(self):
285        self.assertTrue(socket.getdefaulttimeout() is None)
286        with test_support.transient_internet(self.FTP_HOST):
287            socket.setdefaulttimeout(60)
288            try:
289                u = _urlopen_with_retry(self.FTP_HOST)
290            finally:
291                socket.setdefaulttimeout(None)
292            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
293
294    def test_ftp_no_timeout(self):
295        self.assertTrue(socket.getdefaulttimeout() is None)
296        with test_support.transient_internet(self.FTP_HOST):
297            socket.setdefaulttimeout(60)
298            try:
299                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
300            finally:
301                socket.setdefaulttimeout(None)
302            self.assertTrue(u.fp.fp._sock.gettimeout() is None)
303
304    def test_ftp_timeout(self):
305        with test_support.transient_internet(self.FTP_HOST):
306            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
307            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
308
309
310def test_main():
311    test_support.requires("network")
312    test_support.run_unittest(AuthTests,
313                              OtherNetworkTests,
314                              CloseSocketTest,
315                              TimeoutTest,
316                              )
317
318if __name__ == "__main__":
319    test_main()
320