1import unittest
2from test import support
3from test.support import socket_helper
4from test.test_urllib2 import sanepathname2url
5
6import os
7import socket
8import urllib.error
9import urllib.request
10import sys
11
12support.requires("network")
13
14
15def _retry_thrice(func, exc, *args, **kwargs):
16    for i in range(3):
17        try:
18            return func(*args, **kwargs)
19        except exc as e:
20            last_exc = e
21            continue
22    raise last_exc
23
24def _wrap_with_retry_thrice(func, exc):
25    def wrapped(*args, **kwargs):
26        return _retry_thrice(func, exc, *args, **kwargs)
27    return wrapped
28
29# bpo-35411: FTP tests of test_urllib2net randomly fail
30# with "425 Security: Bad IP connecting" on Travis CI
31skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ,
32                                          'bpo-35411: skip FTP test '
33                                          'on Travis CI')
34
35
36# Connecting to remote hosts is flaky.  Make it more robust by retrying
37# the connection several times.
38_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
39                                              urllib.error.URLError)
40
41
42class AuthTests(unittest.TestCase):
43    """Tests urllib2 authentication features."""
44
45## Disabled at the moment since there is no page under python.org which
46## could be used to HTTP authentication.
47#
48#    def test_basic_auth(self):
49#        import http.client
50#
51#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
52#        test_hostport = "www.python.org"
53#        test_realm = 'Test Realm'
54#        test_user = 'test.test_urllib2net'
55#        test_password = 'blah'
56#
57#        # failure
58#        try:
59#            _urlopen_with_retry(test_url)
60#        except urllib2.HTTPError, exc:
61#            self.assertEqual(exc.code, 401)
62#        else:
63#            self.fail("urlopen() should have failed with 401")
64#
65#        # success
66#        auth_handler = urllib2.HTTPBasicAuthHandler()
67#        auth_handler.add_password(test_realm, test_hostport,
68#                                  test_user, test_password)
69#        opener = urllib2.build_opener(auth_handler)
70#        f = opener.open('http://localhost/')
71#        response = _urlopen_with_retry("http://www.python.org/")
72#
73#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
74#        # reasons, let's not implement it!  (it's already implemented for proxy
75#        # specification strings (that is, URLs or authorities specifying a
76#        # proxy), so we must keep that)
77#        self.assertRaises(http.client.InvalidURL,
78#                          urllib2.urlopen, "http://evil:thing@example.com")
79
80
81class CloseSocketTest(unittest.TestCase):
82
83    def test_close(self):
84        # clear _opener global variable
85        self.addCleanup(urllib.request.urlcleanup)
86
87        # calling .close() on urllib2's response objects should close the
88        # underlying socket
89        url = support.TEST_HTTP_URL
90        with socket_helper.transient_internet(url):
91            response = _urlopen_with_retry(url)
92            sock = response.fp
93            self.assertFalse(sock.closed)
94            response.close()
95            self.assertTrue(sock.closed)
96
97class OtherNetworkTests(unittest.TestCase):
98    def setUp(self):
99        if 0:  # for debugging
100            import logging
101            logger = logging.getLogger("test_urllib2net")
102            logger.addHandler(logging.StreamHandler())
103
104    # XXX The rest of these tests aren't very good -- they don't check much.
105    # They do sometimes catch some major disasters, though.
106
107    @skip_ftp_test_on_travis
108    def test_ftp(self):
109        urls = [
110            'ftp://www.pythontest.net/README',
111            ('ftp://www.pythontest.net/non-existent-file',
112             None, urllib.error.URLError),
113            ]
114        self._test_urls(urls, self._extra_handlers())
115
116    def test_file(self):
117        TESTFN = support.TESTFN
118        f = open(TESTFN, 'w')
119        try:
120            f.write('hi there\n')
121            f.close()
122            urls = [
123                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
124                ('file:///nonsensename/etc/passwd', None,
125                 urllib.error.URLError),
126                ]
127            self._test_urls(urls, self._extra_handlers(), retry=True)
128        finally:
129            os.remove(TESTFN)
130
131        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
132
133    # XXX Following test depends on machine configurations that are internal
134    # to CNRI.  Need to set up a public server with the right authentication
135    # configuration for test purposes.
136
137##     def test_cnri(self):
138##         if socket.gethostname() == 'bitdiddle':
139##             localhost = 'bitdiddle.cnri.reston.va.us'
140##         elif socket.gethostname() == 'bitdiddle.concentric.net':
141##             localhost = 'localhost'
142##         else:
143##             localhost = None
144##         if localhost is not None:
145##             urls = [
146##                 'file://%s/etc/passwd' % localhost,
147##                 'http://%s/simple/' % localhost,
148##                 'http://%s/digest/' % localhost,
149##                 'http://%s/not/found.h' % localhost,
150##                 ]
151
152##             bauth = HTTPBasicAuthHandler()
153##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
154##                                'password')
155##             dauth = HTTPDigestAuthHandler()
156##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
157##                                'password')
158
159##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
160
161    def test_urlwithfrag(self):
162        urlwith_frag = "http://www.pythontest.net/index.html#frag"
163        with socket_helper.transient_internet(urlwith_frag):
164            req = urllib.request.Request(urlwith_frag)
165            res = urllib.request.urlopen(req)
166            self.assertEqual(res.geturl(),
167                    "http://www.pythontest.net/index.html#frag")
168
169    def test_redirect_url_withfrag(self):
170        redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
171        with socket_helper.transient_internet(redirect_url_with_frag):
172            req = urllib.request.Request(redirect_url_with_frag)
173            res = urllib.request.urlopen(req)
174            self.assertEqual(res.geturl(),
175                    "http://www.pythontest.net/elsewhere/#frag")
176
177    def test_custom_headers(self):
178        url = support.TEST_HTTP_URL
179        with socket_helper.transient_internet(url):
180            opener = urllib.request.build_opener()
181            request = urllib.request.Request(url)
182            self.assertFalse(request.header_items())
183            opener.open(request)
184            self.assertTrue(request.header_items())
185            self.assertTrue(request.has_header('User-agent'))
186            request.add_header('User-Agent','Test-Agent')
187            opener.open(request)
188            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
189
190    @unittest.skip('XXX: http://www.imdb.com is gone')
191    def test_sites_no_connection_close(self):
192        # Some sites do not send Connection: close header.
193        # Verify that those work properly. (#issue12576)
194
195        URL = 'http://www.imdb.com' # mangles Connection:close
196
197        with socket_helper.transient_internet(URL):
198            try:
199                with urllib.request.urlopen(URL) as res:
200                    pass
201            except ValueError:
202                self.fail("urlopen failed for site not sending \
203                           Connection:close")
204            else:
205                self.assertTrue(res)
206
207            req = urllib.request.urlopen(URL)
208            res = req.read()
209            self.assertTrue(res)
210
211    def _test_urls(self, urls, handlers, retry=True):
212        import time
213        import logging
214        debug = logging.getLogger("test_urllib2").debug
215
216        urlopen = urllib.request.build_opener(*handlers).open
217        if retry:
218            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
219
220        for url in urls:
221            with self.subTest(url=url):
222                if isinstance(url, tuple):
223                    url, req, expected_err = url
224                else:
225                    req = expected_err = None
226
227                with socket_helper.transient_internet(url):
228                    try:
229                        f = urlopen(url, req, support.INTERNET_TIMEOUT)
230                    # urllib.error.URLError is a subclass of OSError
231                    except OSError as err:
232                        if expected_err:
233                            msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
234                                   (expected_err, url, req, type(err), err))
235                            self.assertIsInstance(err, expected_err, msg)
236                        else:
237                            raise
238                    else:
239                        try:
240                            with support.time_out, \
241                                 support.socket_peer_reset, \
242                                 support.ioerror_peer_reset:
243                                buf = f.read()
244                                debug("read %d bytes" % len(buf))
245                        except socket.timeout:
246                            print("<timeout: %s>" % url, file=sys.stderr)
247                        f.close()
248                time.sleep(0.1)
249
250    def _extra_handlers(self):
251        handlers = []
252
253        cfh = urllib.request.CacheFTPHandler()
254        self.addCleanup(cfh.clear_cache)
255        cfh.setTimeout(1)
256        handlers.append(cfh)
257
258        return handlers
259
260
261class TimeoutTest(unittest.TestCase):
262    def setUp(self):
263        # clear _opener global variable
264        self.addCleanup(urllib.request.urlcleanup)
265
266    def test_http_basic(self):
267        self.assertIsNone(socket.getdefaulttimeout())
268        url = support.TEST_HTTP_URL
269        with socket_helper.transient_internet(url, timeout=None):
270            u = _urlopen_with_retry(url)
271            self.addCleanup(u.close)
272            self.assertIsNone(u.fp.raw._sock.gettimeout())
273
274    def test_http_default_timeout(self):
275        self.assertIsNone(socket.getdefaulttimeout())
276        url = support.TEST_HTTP_URL
277        with socket_helper.transient_internet(url):
278            socket.setdefaulttimeout(60)
279            try:
280                u = _urlopen_with_retry(url)
281                self.addCleanup(u.close)
282            finally:
283                socket.setdefaulttimeout(None)
284            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
285
286    def test_http_no_timeout(self):
287        self.assertIsNone(socket.getdefaulttimeout())
288        url = support.TEST_HTTP_URL
289        with socket_helper.transient_internet(url):
290            socket.setdefaulttimeout(60)
291            try:
292                u = _urlopen_with_retry(url, timeout=None)
293                self.addCleanup(u.close)
294            finally:
295                socket.setdefaulttimeout(None)
296            self.assertIsNone(u.fp.raw._sock.gettimeout())
297
298    def test_http_timeout(self):
299        url = support.TEST_HTTP_URL
300        with socket_helper.transient_internet(url):
301            u = _urlopen_with_retry(url, timeout=120)
302            self.addCleanup(u.close)
303            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
304
305    FTP_HOST = 'ftp://www.pythontest.net/'
306
307    @skip_ftp_test_on_travis
308    def test_ftp_basic(self):
309        self.assertIsNone(socket.getdefaulttimeout())
310        with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
311            u = _urlopen_with_retry(self.FTP_HOST)
312            self.addCleanup(u.close)
313            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
314
315    @skip_ftp_test_on_travis
316    def test_ftp_default_timeout(self):
317        self.assertIsNone(socket.getdefaulttimeout())
318        with socket_helper.transient_internet(self.FTP_HOST):
319            socket.setdefaulttimeout(60)
320            try:
321                u = _urlopen_with_retry(self.FTP_HOST)
322                self.addCleanup(u.close)
323            finally:
324                socket.setdefaulttimeout(None)
325            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
326
327    @skip_ftp_test_on_travis
328    def test_ftp_no_timeout(self):
329        self.assertIsNone(socket.getdefaulttimeout())
330        with socket_helper.transient_internet(self.FTP_HOST):
331            socket.setdefaulttimeout(60)
332            try:
333                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
334                self.addCleanup(u.close)
335            finally:
336                socket.setdefaulttimeout(None)
337            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
338
339    @skip_ftp_test_on_travis
340    def test_ftp_timeout(self):
341        with socket_helper.transient_internet(self.FTP_HOST):
342            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
343            self.addCleanup(u.close)
344            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
345
346
347if __name__ == "__main__":
348    unittest.main()
349