1import httplib
2import itertools
3import array
4import StringIO
5import socket
6import errno
7import os
8import tempfile
9
10import unittest
11TestCase = unittest.TestCase
12
13from test import test_support
14
15here = os.path.dirname(__file__)
16# Self-signed cert file for 'localhost'
17CERT_localhost = os.path.join(here, 'keycert.pem')
18# Self-signed cert file for 'fakehostname'
19CERT_fakehostname = os.path.join(here, 'keycert2.pem')
20# Self-signed cert file for self-signed.pythontest.net
21CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
22
23HOST = test_support.HOST
24
25class FakeSocket:
26    def __init__(self, text, fileclass=StringIO.StringIO, host=None, port=None):
27        self.text = text
28        self.fileclass = fileclass
29        self.data = ''
30        self.file_closed = False
31        self.host = host
32        self.port = port
33
34    def sendall(self, data):
35        self.data += ''.join(data)
36
37    def makefile(self, mode, bufsize=None):
38        if mode != 'r' and mode != 'rb':
39            raise httplib.UnimplementedFileMode()
40        # keep the file around so we can check how much was read from it
41        self.file = self.fileclass(self.text)
42        self.file.close = self.file_close #nerf close ()
43        return self.file
44
45    def file_close(self):
46        self.file_closed = True
47
48    def close(self):
49        pass
50
51class EPipeSocket(FakeSocket):
52
53    def __init__(self, text, pipe_trigger):
54        # When sendall() is called with pipe_trigger, raise EPIPE.
55        FakeSocket.__init__(self, text)
56        self.pipe_trigger = pipe_trigger
57
58    def sendall(self, data):
59        if self.pipe_trigger in data:
60            raise socket.error(errno.EPIPE, "gotcha")
61        self.data += data
62
63    def close(self):
64        pass
65
66class NoEOFStringIO(StringIO.StringIO):
67    """Like StringIO, but raises AssertionError on EOF.
68
69    This is used below to test that httplib doesn't try to read
70    more from the underlying file than it should.
71    """
72    def read(self, n=-1):
73        data = StringIO.StringIO.read(self, n)
74        if data == '':
75            raise AssertionError('caller tried to read past EOF')
76        return data
77
78    def readline(self, length=None):
79        data = StringIO.StringIO.readline(self, length)
80        if data == '':
81            raise AssertionError('caller tried to read past EOF')
82        return data
83
84
85class HeaderTests(TestCase):
86    def test_auto_headers(self):
87        # Some headers are added automatically, but should not be added by
88        # .request() if they are explicitly set.
89
90        class HeaderCountingBuffer(list):
91            def __init__(self):
92                self.count = {}
93            def append(self, item):
94                kv = item.split(':')
95                if len(kv) > 1:
96                    # item is a 'Key: Value' header string
97                    lcKey = kv[0].lower()
98                    self.count.setdefault(lcKey, 0)
99                    self.count[lcKey] += 1
100                list.append(self, item)
101
102        for explicit_header in True, False:
103            for header in 'Content-length', 'Host', 'Accept-encoding':
104                conn = httplib.HTTPConnection('example.com')
105                conn.sock = FakeSocket('blahblahblah')
106                conn._buffer = HeaderCountingBuffer()
107
108                body = 'spamspamspam'
109                headers = {}
110                if explicit_header:
111                    headers[header] = str(len(body))
112                conn.request('POST', '/', body, headers)
113                self.assertEqual(conn._buffer.count[header.lower()], 1)
114
115    def test_content_length_0(self):
116
117        class ContentLengthChecker(list):
118            def __init__(self):
119                list.__init__(self)
120                self.content_length = None
121            def append(self, item):
122                kv = item.split(':', 1)
123                if len(kv) > 1 and kv[0].lower() == 'content-length':
124                    self.content_length = kv[1].strip()
125                list.append(self, item)
126
127        # Here, we're testing that methods expecting a body get a
128        # content-length set to zero if the body is empty (either None or '')
129        bodies = (None, '')
130        methods_with_body = ('PUT', 'POST', 'PATCH')
131        for method, body in itertools.product(methods_with_body, bodies):
132            conn = httplib.HTTPConnection('example.com')
133            conn.sock = FakeSocket(None)
134            conn._buffer = ContentLengthChecker()
135            conn.request(method, '/', body)
136            self.assertEqual(
137                conn._buffer.content_length, '0',
138                'Header Content-Length incorrect on {}'.format(method)
139            )
140
141        # For these methods, we make sure that content-length is not set when
142        # the body is None because it might cause unexpected behaviour on the
143        # server.
144        methods_without_body = (
145             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
146        )
147        for method in methods_without_body:
148            conn = httplib.HTTPConnection('example.com')
149            conn.sock = FakeSocket(None)
150            conn._buffer = ContentLengthChecker()
151            conn.request(method, '/', None)
152            self.assertEqual(
153                conn._buffer.content_length, None,
154                'Header Content-Length set for empty body on {}'.format(method)
155            )
156
157        # If the body is set to '', that's considered to be "present but
158        # empty" rather than "missing", so content length would be set, even
159        # for methods that don't expect a body.
160        for method in methods_without_body:
161            conn = httplib.HTTPConnection('example.com')
162            conn.sock = FakeSocket(None)
163            conn._buffer = ContentLengthChecker()
164            conn.request(method, '/', '')
165            self.assertEqual(
166                conn._buffer.content_length, '0',
167                'Header Content-Length incorrect on {}'.format(method)
168            )
169
170        # If the body is set, make sure Content-Length is set.
171        for method in itertools.chain(methods_without_body, methods_with_body):
172            conn = httplib.HTTPConnection('example.com')
173            conn.sock = FakeSocket(None)
174            conn._buffer = ContentLengthChecker()
175            conn.request(method, '/', ' ')
176            self.assertEqual(
177                conn._buffer.content_length, '1',
178                'Header Content-Length incorrect on {}'.format(method)
179            )
180
181    def test_putheader(self):
182        conn = httplib.HTTPConnection('example.com')
183        conn.sock = FakeSocket(None)
184        conn.putrequest('GET','/')
185        conn.putheader('Content-length',42)
186        self.assertIn('Content-length: 42', conn._buffer)
187
188        conn.putheader('Foo', ' bar ')
189        self.assertIn(b'Foo:  bar ', conn._buffer)
190        conn.putheader('Bar', '\tbaz\t')
191        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
192        conn.putheader('Authorization', 'Bearer mytoken')
193        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
194        conn.putheader('IterHeader', 'IterA', 'IterB')
195        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
196        conn.putheader('LatinHeader', b'\xFF')
197        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
198        conn.putheader('Utf8Header', b'\xc3\x80')
199        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
200        conn.putheader('C1-Control', b'next\x85line')
201        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
202        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
203        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
204        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
205        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
206        conn.putheader('Key Space', 'value')
207        self.assertIn(b'Key Space: value', conn._buffer)
208        conn.putheader('KeySpace ', 'value')
209        self.assertIn(b'KeySpace : value', conn._buffer)
210        conn.putheader(b'Nonbreak\xa0Space', 'value')
211        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
212        conn.putheader(b'\xa0NonbreakSpace', 'value')
213        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
214
215    def test_ipv6host_header(self):
216        # Default host header on IPv6 transaction should be wrapped by [] if
217        # it is an IPv6 address
218        expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
219                   'Accept-Encoding: identity\r\n\r\n'
220        conn = httplib.HTTPConnection('[2001::]:81')
221        sock = FakeSocket('')
222        conn.sock = sock
223        conn.request('GET', '/foo')
224        self.assertTrue(sock.data.startswith(expected))
225
226        expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
227                   'Accept-Encoding: identity\r\n\r\n'
228        conn = httplib.HTTPConnection('[2001:102A::]')
229        sock = FakeSocket('')
230        conn.sock = sock
231        conn.request('GET', '/foo')
232        self.assertTrue(sock.data.startswith(expected))
233
234    def test_malformed_headers_coped_with(self):
235        # Issue 19996
236        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
237        sock = FakeSocket(body)
238        resp = httplib.HTTPResponse(sock)
239        resp.begin()
240
241        self.assertEqual(resp.getheader('First'), 'val')
242        self.assertEqual(resp.getheader('Second'), 'val')
243
244    def test_malformed_truncation(self):
245        # Other malformed header lines, especially without colons, used to
246        # cause the rest of the header section to be truncated
247        resp = (
248            b'HTTP/1.1 200 OK\r\n'
249            b'Public-Key-Pins: \n'
250            b'pin-sha256="xxx=";\n'
251            b'report-uri="https://..."\r\n'
252            b'Transfer-Encoding: chunked\r\n'
253            b'\r\n'
254            b'4\r\nbody\r\n0\r\n\r\n'
255        )
256        resp = httplib.HTTPResponse(FakeSocket(resp))
257        resp.begin()
258        self.assertIsNotNone(resp.getheader('Public-Key-Pins'))
259        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
260        self.assertEqual(resp.read(), b'body')
261
262    def test_blank_line_forms(self):
263        # Test that both CRLF and LF blank lines can terminate the header
264        # section and start the body
265        for blank in (b'\r\n', b'\n'):
266            resp = b'HTTP/1.1 200 OK\r\n' b'Transfer-Encoding: chunked\r\n'
267            resp += blank
268            resp += b'4\r\nbody\r\n0\r\n\r\n'
269            resp = httplib.HTTPResponse(FakeSocket(resp))
270            resp.begin()
271            self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
272            self.assertEqual(resp.read(), b'body')
273
274            resp = b'HTTP/1.0 200 OK\r\n' + blank + b'body'
275            resp = httplib.HTTPResponse(FakeSocket(resp))
276            resp.begin()
277            self.assertEqual(resp.read(), b'body')
278
279        # A blank line ending in CR is not treated as the end of the HTTP
280        # header section, therefore header fields following it should be
281        # parsed if possible
282        resp = (
283            b'HTTP/1.1 200 OK\r\n'
284            b'\r'
285            b'Name: value\r\n'
286            b'Transfer-Encoding: chunked\r\n'
287            b'\r\n'
288            b'4\r\nbody\r\n0\r\n\r\n'
289        )
290        resp = httplib.HTTPResponse(FakeSocket(resp))
291        resp.begin()
292        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
293        self.assertEqual(resp.read(), b'body')
294
295        # No header fields nor blank line
296        resp = b'HTTP/1.0 200 OK\r\n'
297        resp = httplib.HTTPResponse(FakeSocket(resp))
298        resp.begin()
299        self.assertEqual(resp.read(), b'')
300
301    def test_from_line(self):
302        # The parser handles "From" lines specially, so test this does not
303        # affect parsing the rest of the header section
304        resp = (
305            b'HTTP/1.1 200 OK\r\n'
306            b'From start\r\n'
307            b' continued\r\n'
308            b'Name: value\r\n'
309            b'From middle\r\n'
310            b' continued\r\n'
311            b'Transfer-Encoding: chunked\r\n'
312            b'From end\r\n'
313            b'\r\n'
314            b'4\r\nbody\r\n0\r\n\r\n'
315        )
316        resp = httplib.HTTPResponse(FakeSocket(resp))
317        resp.begin()
318        self.assertIsNotNone(resp.getheader('Name'))
319        self.assertEqual(resp.getheader('Transfer-Encoding'), 'chunked')
320        self.assertEqual(resp.read(), b'body')
321
322        resp = (
323            b'HTTP/1.0 200 OK\r\n'
324            b'From alone\r\n'
325            b'\r\n'
326            b'body'
327        )
328        resp = httplib.HTTPResponse(FakeSocket(resp))
329        resp.begin()
330        self.assertEqual(resp.read(), b'body')
331
332    def test_parse_all_octets(self):
333        # Ensure no valid header field octet breaks the parser
334        body = (
335            b'HTTP/1.1 200 OK\r\n'
336            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
337            b'VCHAR: ' + bytearray(range(0x21, 0x7E + 1)) + b'\r\n'
338            b'obs-text: ' + bytearray(range(0x80, 0xFF + 1)) + b'\r\n'
339            b'obs-fold: text\r\n'
340            b' folded with space\r\n'
341            b'\tfolded with tab\r\n'
342            b'Content-Length: 0\r\n'
343            b'\r\n'
344        )
345        sock = FakeSocket(body)
346        resp = httplib.HTTPResponse(sock)
347        resp.begin()
348        self.assertEqual(resp.getheader('Content-Length'), '0')
349        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
350        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
351        self.assertEqual(resp.getheader('VCHAR'), vchar)
352        self.assertIsNotNone(resp.getheader('obs-text'))
353        folded = resp.getheader('obs-fold')
354        self.assertTrue(folded.startswith('text'))
355        self.assertIn(' folded with space', folded)
356        self.assertTrue(folded.endswith('folded with tab'))
357
358    def test_invalid_headers(self):
359        conn = httplib.HTTPConnection('example.com')
360        conn.sock = FakeSocket('')
361        conn.putrequest('GET', '/')
362
363        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
364        # longer allowed in header names
365        cases = (
366            (b'Invalid\r\nName', b'ValidValue'),
367            (b'Invalid\rName', b'ValidValue'),
368            (b'Invalid\nName', b'ValidValue'),
369            (b'\r\nInvalidName', b'ValidValue'),
370            (b'\rInvalidName', b'ValidValue'),
371            (b'\nInvalidName', b'ValidValue'),
372            (b' InvalidName', b'ValidValue'),
373            (b'\tInvalidName', b'ValidValue'),
374            (b'Invalid:Name', b'ValidValue'),
375            (b':InvalidName', b'ValidValue'),
376            (b'ValidName', b'Invalid\r\nValue'),
377            (b'ValidName', b'Invalid\rValue'),
378            (b'ValidName', b'Invalid\nValue'),
379            (b'ValidName', b'InvalidValue\r\n'),
380            (b'ValidName', b'InvalidValue\r'),
381            (b'ValidName', b'InvalidValue\n'),
382        )
383        for name, value in cases:
384            with self.assertRaisesRegexp(ValueError, 'Invalid header'):
385                conn.putheader(name, value)
386
387
388class BasicTest(TestCase):
389    def test_status_lines(self):
390        # Test HTTP status lines
391
392        body = "HTTP/1.1 200 Ok\r\n\r\nText"
393        sock = FakeSocket(body)
394        resp = httplib.HTTPResponse(sock)
395        resp.begin()
396        self.assertEqual(resp.read(0), '')  # Issue #20007
397        self.assertFalse(resp.isclosed())
398        self.assertEqual(resp.read(), 'Text')
399        self.assertTrue(resp.isclosed())
400
401        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
402        sock = FakeSocket(body)
403        resp = httplib.HTTPResponse(sock)
404        self.assertRaises(httplib.BadStatusLine, resp.begin)
405
406    def test_bad_status_repr(self):
407        exc = httplib.BadStatusLine('')
408        self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
409
410    def test_partial_reads(self):
411        # if we have a length, the system knows when to close itself
412        # same behaviour than when we read the whole thing with read()
413        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
414        sock = FakeSocket(body)
415        resp = httplib.HTTPResponse(sock)
416        resp.begin()
417        self.assertEqual(resp.read(2), 'Te')
418        self.assertFalse(resp.isclosed())
419        self.assertEqual(resp.read(2), 'xt')
420        self.assertTrue(resp.isclosed())
421
422    def test_partial_reads_no_content_length(self):
423        # when no length is present, the socket should be gracefully closed when
424        # all data was read
425        body = "HTTP/1.1 200 Ok\r\n\r\nText"
426        sock = FakeSocket(body)
427        resp = httplib.HTTPResponse(sock)
428        resp.begin()
429        self.assertEqual(resp.read(2), 'Te')
430        self.assertFalse(resp.isclosed())
431        self.assertEqual(resp.read(2), 'xt')
432        self.assertEqual(resp.read(1), '')
433        self.assertTrue(resp.isclosed())
434
435    def test_partial_reads_incomplete_body(self):
436        # if the server shuts down the connection before the whole
437        # content-length is delivered, the socket is gracefully closed
438        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
439        sock = FakeSocket(body)
440        resp = httplib.HTTPResponse(sock)
441        resp.begin()
442        self.assertEqual(resp.read(2), 'Te')
443        self.assertFalse(resp.isclosed())
444        self.assertEqual(resp.read(2), 'xt')
445        self.assertEqual(resp.read(1), '')
446        self.assertTrue(resp.isclosed())
447
448    def test_host_port(self):
449        # Check invalid host_port
450
451        # Note that httplib does not accept user:password@ in the host-port.
452        for hp in ("www.python.org:abc", "user:password@www.python.org"):
453            self.assertRaises(httplib.InvalidURL, httplib.HTTP, hp)
454
455        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b",
456                          8000),
457                         ("www.python.org:80", "www.python.org", 80),
458                         ("www.python.org", "www.python.org", 80),
459                         ("www.python.org:", "www.python.org", 80),
460                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
461            http = httplib.HTTP(hp)
462            c = http._conn
463            if h != c.host:
464                self.fail("Host incorrectly parsed: %s != %s" % (h, c.host))
465            if p != c.port:
466                self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
467
468    def test_response_headers(self):
469        # test response with multiple message headers with the same field name.
470        text = ('HTTP/1.1 200 OK\r\n'
471                'Set-Cookie: Customer="WILE_E_COYOTE";'
472                ' Version="1"; Path="/acme"\r\n'
473                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
474                ' Path="/acme"\r\n'
475                '\r\n'
476                'No body\r\n')
477        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
478               ', '
479               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
480        s = FakeSocket(text)
481        r = httplib.HTTPResponse(s)
482        r.begin()
483        cookies = r.getheader("Set-Cookie")
484        if cookies != hdr:
485            self.fail("multiple headers not combined properly")
486
487    def test_read_head(self):
488        # Test that the library doesn't attempt to read any data
489        # from a HEAD request.  (Tickles SF bug #622042.)
490        sock = FakeSocket(
491            'HTTP/1.1 200 OK\r\n'
492            'Content-Length: 14432\r\n'
493            '\r\n',
494            NoEOFStringIO)
495        resp = httplib.HTTPResponse(sock, method="HEAD")
496        resp.begin()
497        if resp.read() != "":
498            self.fail("Did not expect response from HEAD request")
499
500    def test_too_many_headers(self):
501        headers = '\r\n'.join('Header%d: foo' % i for i in xrange(200)) + '\r\n'
502        text = ('HTTP/1.1 200 OK\r\n' + headers)
503        s = FakeSocket(text)
504        r = httplib.HTTPResponse(s)
505        self.assertRaises(httplib.HTTPException, r.begin)
506
507    def test_send_file(self):
508        expected = 'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
509                   'Accept-Encoding: identity\r\nContent-Length:'
510
511        body = open(__file__, 'rb')
512        conn = httplib.HTTPConnection('example.com')
513        sock = FakeSocket(body)
514        conn.sock = sock
515        conn.request('GET', '/foo', body)
516        self.assertTrue(sock.data.startswith(expected))
517        self.assertIn('def test_send_file', sock.data)
518
519    def test_send_tempfile(self):
520        expected = ('GET /foo HTTP/1.1\r\nHost: example.com\r\n'
521                    'Accept-Encoding: identity\r\nContent-Length: 9\r\n\r\n'
522                    'fake\ndata')
523
524        with tempfile.TemporaryFile() as body:
525            body.write('fake\ndata')
526            body.seek(0)
527
528            conn = httplib.HTTPConnection('example.com')
529            sock = FakeSocket(body)
530            conn.sock = sock
531            conn.request('GET', '/foo', body)
532        self.assertEqual(sock.data, expected)
533
534    def test_send(self):
535        expected = 'this is a test this is only a test'
536        conn = httplib.HTTPConnection('example.com')
537        sock = FakeSocket(None)
538        conn.sock = sock
539        conn.send(expected)
540        self.assertEqual(expected, sock.data)
541        sock.data = ''
542        conn.send(array.array('c', expected))
543        self.assertEqual(expected, sock.data)
544        sock.data = ''
545        conn.send(StringIO.StringIO(expected))
546        self.assertEqual(expected, sock.data)
547
548    def test_chunked(self):
549        chunked_start = (
550            'HTTP/1.1 200 OK\r\n'
551            'Transfer-Encoding: chunked\r\n\r\n'
552            'a\r\n'
553            'hello worl\r\n'
554            '1\r\n'
555            'd\r\n'
556        )
557        sock = FakeSocket(chunked_start + '0\r\n')
558        resp = httplib.HTTPResponse(sock, method="GET")
559        resp.begin()
560        self.assertEqual(resp.read(), 'hello world')
561        resp.close()
562
563        for x in ('', 'foo\r\n'):
564            sock = FakeSocket(chunked_start + x)
565            resp = httplib.HTTPResponse(sock, method="GET")
566            resp.begin()
567            try:
568                resp.read()
569            except httplib.IncompleteRead, i:
570                self.assertEqual(i.partial, 'hello world')
571                self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
572                self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
573            else:
574                self.fail('IncompleteRead expected')
575            finally:
576                resp.close()
577
578    def test_chunked_head(self):
579        chunked_start = (
580            'HTTP/1.1 200 OK\r\n'
581            'Transfer-Encoding: chunked\r\n\r\n'
582            'a\r\n'
583            'hello world\r\n'
584            '1\r\n'
585            'd\r\n'
586        )
587        sock = FakeSocket(chunked_start + '0\r\n')
588        resp = httplib.HTTPResponse(sock, method="HEAD")
589        resp.begin()
590        self.assertEqual(resp.read(), '')
591        self.assertEqual(resp.status, 200)
592        self.assertEqual(resp.reason, 'OK')
593        self.assertTrue(resp.isclosed())
594
595    def test_negative_content_length(self):
596        sock = FakeSocket('HTTP/1.1 200 OK\r\n'
597                          'Content-Length: -1\r\n\r\nHello\r\n')
598        resp = httplib.HTTPResponse(sock, method="GET")
599        resp.begin()
600        self.assertEqual(resp.read(), 'Hello\r\n')
601        self.assertTrue(resp.isclosed())
602
603    def test_incomplete_read(self):
604        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
605        resp = httplib.HTTPResponse(sock, method="GET")
606        resp.begin()
607        try:
608            resp.read()
609        except httplib.IncompleteRead as i:
610            self.assertEqual(i.partial, 'Hello\r\n')
611            self.assertEqual(repr(i),
612                             "IncompleteRead(7 bytes read, 3 more expected)")
613            self.assertEqual(str(i),
614                             "IncompleteRead(7 bytes read, 3 more expected)")
615            self.assertTrue(resp.isclosed())
616        else:
617            self.fail('IncompleteRead expected')
618
619    def test_epipe(self):
620        sock = EPipeSocket(
621            "HTTP/1.0 401 Authorization Required\r\n"
622            "Content-type: text/html\r\n"
623            "WWW-Authenticate: Basic realm=\"example\"\r\n",
624            b"Content-Length")
625        conn = httplib.HTTPConnection("example.com")
626        conn.sock = sock
627        self.assertRaises(socket.error,
628                          lambda: conn.request("PUT", "/url", "body"))
629        resp = conn.getresponse()
630        self.assertEqual(401, resp.status)
631        self.assertEqual("Basic realm=\"example\"",
632                         resp.getheader("www-authenticate"))
633
634    def test_filenoattr(self):
635        # Just test the fileno attribute in the HTTPResponse Object.
636        body = "HTTP/1.1 200 Ok\r\n\r\nText"
637        sock = FakeSocket(body)
638        resp = httplib.HTTPResponse(sock)
639        self.assertTrue(hasattr(resp,'fileno'),
640                'HTTPResponse should expose a fileno attribute')
641
642    # Test lines overflowing the max line size (_MAXLINE in httplib)
643
644    def test_overflowing_status_line(self):
645        self.skipTest("disabled for HTTP 0.9 support")
646        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
647        resp = httplib.HTTPResponse(FakeSocket(body))
648        self.assertRaises((httplib.LineTooLong, httplib.BadStatusLine), resp.begin)
649
650    def test_overflowing_header_line(self):
651        body = (
652            'HTTP/1.1 200 OK\r\n'
653            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
654        )
655        resp = httplib.HTTPResponse(FakeSocket(body))
656        self.assertRaises(httplib.LineTooLong, resp.begin)
657
658    def test_overflowing_chunked_line(self):
659        body = (
660            'HTTP/1.1 200 OK\r\n'
661            'Transfer-Encoding: chunked\r\n\r\n'
662            + '0' * 65536 + 'a\r\n'
663            'hello world\r\n'
664            '0\r\n'
665        )
666        resp = httplib.HTTPResponse(FakeSocket(body))
667        resp.begin()
668        self.assertRaises(httplib.LineTooLong, resp.read)
669
670    def test_early_eof(self):
671        # Test httpresponse with no \r\n termination,
672        body = "HTTP/1.1 200 Ok"
673        sock = FakeSocket(body)
674        resp = httplib.HTTPResponse(sock)
675        resp.begin()
676        self.assertEqual(resp.read(), '')
677        self.assertTrue(resp.isclosed())
678
679    def test_error_leak(self):
680        # Test that the socket is not leaked if getresponse() fails
681        conn = httplib.HTTPConnection('example.com')
682        response = []
683        class Response(httplib.HTTPResponse):
684            def __init__(self, *pos, **kw):
685                response.append(self)  # Avoid garbage collector closing the socket
686                httplib.HTTPResponse.__init__(self, *pos, **kw)
687        conn.response_class = Response
688        conn.sock = FakeSocket('')  # Emulate server dropping connection
689        conn.request('GET', '/')
690        self.assertRaises(httplib.BadStatusLine, conn.getresponse)
691        self.assertTrue(response)
692        #self.assertTrue(response[0].closed)
693        self.assertTrue(conn.sock.file_closed)
694
695    def test_proxy_tunnel_without_status_line(self):
696        # Issue 17849: If a proxy tunnel is created that does not return
697        # a status code, fail.
698        body = 'hello world'
699        conn = httplib.HTTPConnection('example.com', strict=False)
700        conn.set_tunnel('foo')
701        conn.sock = FakeSocket(body)
702        with self.assertRaisesRegexp(socket.error, "Invalid response"):
703            conn._tunnel()
704
705class OfflineTest(TestCase):
706    def test_responses(self):
707        self.assertEqual(httplib.responses[httplib.NOT_FOUND], "Not Found")
708
709
710class TestServerMixin:
711    """A limited socket server mixin.
712
713    This is used by test cases for testing http connection end points.
714    """
715    def setUp(self):
716        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
717        self.port = test_support.bind_port(self.serv)
718        self.source_port = test_support.find_unused_port()
719        self.serv.listen(5)
720        self.conn = None
721
722    def tearDown(self):
723        if self.conn:
724            self.conn.close()
725            self.conn = None
726        self.serv.close()
727        self.serv = None
728
729class SourceAddressTest(TestServerMixin, TestCase):
730    def testHTTPConnectionSourceAddress(self):
731        self.conn = httplib.HTTPConnection(HOST, self.port,
732                source_address=('', self.source_port))
733        self.conn.connect()
734        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
735
736    @unittest.skipIf(not hasattr(httplib, 'HTTPSConnection'),
737                     'httplib.HTTPSConnection not defined')
738    def testHTTPSConnectionSourceAddress(self):
739        self.conn = httplib.HTTPSConnection(HOST, self.port,
740                source_address=('', self.source_port))
741        # We don't test anything here other than the constructor not barfing as
742        # this code doesn't deal with setting up an active running SSL server
743        # for an ssl_wrapped connect() to actually return from.
744
745
746class HTTPTest(TestServerMixin, TestCase):
747    def testHTTPConnection(self):
748        self.conn = httplib.HTTP(host=HOST, port=self.port, strict=None)
749        self.conn.connect()
750        self.assertEqual(self.conn._conn.host, HOST)
751        self.assertEqual(self.conn._conn.port, self.port)
752
753    def testHTTPWithConnectHostPort(self):
754        testhost = 'unreachable.test.domain'
755        testport = '80'
756        self.conn = httplib.HTTP(host=testhost, port=testport)
757        self.conn.connect(host=HOST, port=self.port)
758        self.assertNotEqual(self.conn._conn.host, testhost)
759        self.assertNotEqual(self.conn._conn.port, testport)
760        self.assertEqual(self.conn._conn.host, HOST)
761        self.assertEqual(self.conn._conn.port, self.port)
762
763
764class TimeoutTest(TestCase):
765    PORT = None
766
767    def setUp(self):
768        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
769        TimeoutTest.PORT = test_support.bind_port(self.serv)
770        self.serv.listen(5)
771
772    def tearDown(self):
773        self.serv.close()
774        self.serv = None
775
776    def testTimeoutAttribute(self):
777        '''This will prove that the timeout gets through
778        HTTPConnection and into the socket.
779        '''
780        # default -- use global socket timeout
781        self.assertIsNone(socket.getdefaulttimeout())
782        socket.setdefaulttimeout(30)
783        try:
784            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
785            httpConn.connect()
786        finally:
787            socket.setdefaulttimeout(None)
788        self.assertEqual(httpConn.sock.gettimeout(), 30)
789        httpConn.close()
790
791        # no timeout -- do not use global socket default
792        self.assertIsNone(socket.getdefaulttimeout())
793        socket.setdefaulttimeout(30)
794        try:
795            httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
796                                              timeout=None)
797            httpConn.connect()
798        finally:
799            socket.setdefaulttimeout(None)
800        self.assertEqual(httpConn.sock.gettimeout(), None)
801        httpConn.close()
802
803        # a value
804        httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
805        httpConn.connect()
806        self.assertEqual(httpConn.sock.gettimeout(), 30)
807        httpConn.close()
808
809
810class HTTPSTest(TestCase):
811
812    def setUp(self):
813        if not hasattr(httplib, 'HTTPSConnection'):
814            self.skipTest('ssl support required')
815
816    def make_server(self, certfile):
817        from test.ssl_servers import make_https_server
818        return make_https_server(self, certfile=certfile)
819
820    def test_attributes(self):
821        # simple test to check it's storing the timeout
822        h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
823        self.assertEqual(h.timeout, 30)
824
825    def test_networked(self):
826        # Default settings: requires a valid cert from a trusted CA
827        import ssl
828        test_support.requires('network')
829        with test_support.transient_internet('self-signed.pythontest.net'):
830            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443)
831            with self.assertRaises(ssl.SSLError) as exc_info:
832                h.request('GET', '/')
833            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
834
835    def test_networked_noverification(self):
836        # Switch off cert verification
837        import ssl
838        test_support.requires('network')
839        with test_support.transient_internet('self-signed.pythontest.net'):
840            context = ssl._create_stdlib_context()
841            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443,
842                                        context=context)
843            h.request('GET', '/')
844            resp = h.getresponse()
845            self.assertIn('nginx', resp.getheader('server'))
846
847    @test_support.system_must_validate_cert
848    def test_networked_trusted_by_default_cert(self):
849        # Default settings: requires a valid cert from a trusted CA
850        test_support.requires('network')
851        with test_support.transient_internet('www.python.org'):
852            h = httplib.HTTPSConnection('www.python.org', 443)
853            h.request('GET', '/')
854            resp = h.getresponse()
855            content_type = resp.getheader('content-type')
856            self.assertIn('text/html', content_type)
857
858    def test_networked_good_cert(self):
859        # We feed the server's cert as a validating cert
860        import ssl
861        test_support.requires('network')
862        with test_support.transient_internet('self-signed.pythontest.net'):
863            context = ssl.SSLContext(ssl.PROTOCOL_TLS)
864            context.verify_mode = ssl.CERT_REQUIRED
865            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
866            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
867            h.request('GET', '/')
868            resp = h.getresponse()
869            server_string = resp.getheader('server')
870            self.assertIn('nginx', server_string)
871
872    def test_networked_bad_cert(self):
873        # We feed a "CA" cert that is unrelated to the server's cert
874        import ssl
875        test_support.requires('network')
876        with test_support.transient_internet('self-signed.pythontest.net'):
877            context = ssl.SSLContext(ssl.PROTOCOL_TLS)
878            context.verify_mode = ssl.CERT_REQUIRED
879            context.load_verify_locations(CERT_localhost)
880            h = httplib.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
881            with self.assertRaises(ssl.SSLError) as exc_info:
882                h.request('GET', '/')
883            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
884
885    def test_local_unknown_cert(self):
886        # The custom cert isn't known to the default trust bundle
887        import ssl
888        server = self.make_server(CERT_localhost)
889        h = httplib.HTTPSConnection('localhost', server.port)
890        with self.assertRaises(ssl.SSLError) as exc_info:
891            h.request('GET', '/')
892        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
893
894    def test_local_good_hostname(self):
895        # The (valid) cert validates the HTTP hostname
896        import ssl
897        server = self.make_server(CERT_localhost)
898        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
899        context.verify_mode = ssl.CERT_REQUIRED
900        context.load_verify_locations(CERT_localhost)
901        h = httplib.HTTPSConnection('localhost', server.port, context=context)
902        h.request('GET', '/nonexistent')
903        resp = h.getresponse()
904        self.assertEqual(resp.status, 404)
905
906    def test_local_bad_hostname(self):
907        # The (valid) cert doesn't validate the HTTP hostname
908        import ssl
909        server = self.make_server(CERT_fakehostname)
910        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
911        context.verify_mode = ssl.CERT_REQUIRED
912        context.check_hostname = True
913        context.load_verify_locations(CERT_fakehostname)
914        h = httplib.HTTPSConnection('localhost', server.port, context=context)
915        with self.assertRaises(ssl.CertificateError):
916            h.request('GET', '/')
917        h.close()
918        # With context.check_hostname=False, the mismatching is ignored
919        context.check_hostname = False
920        h = httplib.HTTPSConnection('localhost', server.port, context=context)
921        h.request('GET', '/nonexistent')
922        resp = h.getresponse()
923        self.assertEqual(resp.status, 404)
924
925    def test_host_port(self):
926        # Check invalid host_port
927
928        for hp in ("www.python.org:abc", "user:password@www.python.org"):
929            self.assertRaises(httplib.InvalidURL, httplib.HTTPSConnection, hp)
930
931        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
932                          "fe80::207:e9ff:fe9b", 8000),
933                         ("www.python.org:443", "www.python.org", 443),
934                         ("www.python.org:", "www.python.org", 443),
935                         ("www.python.org", "www.python.org", 443),
936                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
937                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
938                             443)):
939            c = httplib.HTTPSConnection(hp)
940            self.assertEqual(h, c.host)
941            self.assertEqual(p, c.port)
942
943
944class TunnelTests(TestCase):
945    def test_connect(self):
946        response_text = (
947            'HTTP/1.0 200 OK\r\n\r\n'   # Reply to CONNECT
948            'HTTP/1.1 200 OK\r\n'       # Reply to HEAD
949            'Content-Length: 42\r\n\r\n'
950        )
951
952        def create_connection(address, timeout=None, source_address=None):
953            return FakeSocket(response_text, host=address[0], port=address[1])
954
955        conn = httplib.HTTPConnection('proxy.com')
956        conn._create_connection = create_connection
957
958        # Once connected, we should not be able to tunnel anymore
959        conn.connect()
960        self.assertRaises(RuntimeError, conn.set_tunnel, 'destination.com')
961
962        # But if close the connection, we are good.
963        conn.close()
964        conn.set_tunnel('destination.com')
965        conn.request('HEAD', '/', '')
966
967        self.assertEqual(conn.sock.host, 'proxy.com')
968        self.assertEqual(conn.sock.port, 80)
969        self.assertIn('CONNECT destination.com', conn.sock.data)
970        # issue22095
971        self.assertNotIn('Host: destination.com:None', conn.sock.data)
972        self.assertIn('Host: destination.com', conn.sock.data)
973
974        self.assertNotIn('Host: proxy.com', conn.sock.data)
975
976        conn.close()
977
978        conn.request('PUT', '/', '')
979        self.assertEqual(conn.sock.host, 'proxy.com')
980        self.assertEqual(conn.sock.port, 80)
981        self.assertTrue('CONNECT destination.com' in conn.sock.data)
982        self.assertTrue('Host: destination.com' in conn.sock.data)
983
984
985@test_support.reap_threads
986def test_main(verbose=None):
987    test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
988                              HTTPTest, HTTPSTest, SourceAddressTest,
989                              TunnelTests)
990
991if __name__ == '__main__':
992    test_main()
993