1import errno
2from http import client
3import io
4import itertools
5import os
6import array
7import re
8import socket
9import threading
10import warnings
11
12import unittest
13TestCase = unittest.TestCase
14
15from test import support
16from test.support import socket_helper
17
18here = os.path.dirname(__file__)
19# Self-signed cert file for 'localhost'
20CERT_localhost = os.path.join(here, 'keycert.pem')
21# Self-signed cert file for 'fakehostname'
22CERT_fakehostname = os.path.join(here, 'keycert2.pem')
23# Self-signed cert file for self-signed.pythontest.net
24CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem')
25
26# constants for testing chunked encoding
27chunked_start = (
28    'HTTP/1.1 200 OK\r\n'
29    'Transfer-Encoding: chunked\r\n\r\n'
30    'a\r\n'
31    'hello worl\r\n'
32    '3\r\n'
33    'd! \r\n'
34    '8\r\n'
35    'and now \r\n'
36    '22\r\n'
37    'for something completely different\r\n'
38)
39chunked_expected = b'hello world! and now for something completely different'
40chunk_extension = ";foo=bar"
41last_chunk = "0\r\n"
42last_chunk_extended = "0" + chunk_extension + "\r\n"
43trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n"
44chunked_end = "\r\n"
45
46HOST = socket_helper.HOST
47
48class FakeSocket:
49    def __init__(self, text, fileclass=io.BytesIO, host=None, port=None):
50        if isinstance(text, str):
51            text = text.encode("ascii")
52        self.text = text
53        self.fileclass = fileclass
54        self.data = b''
55        self.sendall_calls = 0
56        self.file_closed = False
57        self.host = host
58        self.port = port
59
60    def sendall(self, data):
61        self.sendall_calls += 1
62        self.data += data
63
64    def makefile(self, mode, bufsize=None):
65        if mode != 'r' and mode != 'rb':
66            raise client.UnimplementedFileMode()
67        # keep the file around so we can check how much was read from it
68        self.file = self.fileclass(self.text)
69        self.file.close = self.file_close #nerf close ()
70        return self.file
71
72    def file_close(self):
73        self.file_closed = True
74
75    def close(self):
76        pass
77
78    def setsockopt(self, level, optname, value):
79        pass
80
81class EPipeSocket(FakeSocket):
82
83    def __init__(self, text, pipe_trigger):
84        # When sendall() is called with pipe_trigger, raise EPIPE.
85        FakeSocket.__init__(self, text)
86        self.pipe_trigger = pipe_trigger
87
88    def sendall(self, data):
89        if self.pipe_trigger in data:
90            raise OSError(errno.EPIPE, "gotcha")
91        self.data += data
92
93    def close(self):
94        pass
95
96class NoEOFBytesIO(io.BytesIO):
97    """Like BytesIO, but raises AssertionError on EOF.
98
99    This is used below to test that http.client doesn't try to read
100    more from the underlying file than it should.
101    """
102    def read(self, n=-1):
103        data = io.BytesIO.read(self, n)
104        if data == b'':
105            raise AssertionError('caller tried to read past EOF')
106        return data
107
108    def readline(self, length=None):
109        data = io.BytesIO.readline(self, length)
110        if data == b'':
111            raise AssertionError('caller tried to read past EOF')
112        return data
113
114class FakeSocketHTTPConnection(client.HTTPConnection):
115    """HTTPConnection subclass using FakeSocket; counts connect() calls"""
116
117    def __init__(self, *args):
118        self.connections = 0
119        super().__init__('example.com')
120        self.fake_socket_args = args
121        self._create_connection = self.create_connection
122
123    def connect(self):
124        """Count the number of times connect() is invoked"""
125        self.connections += 1
126        return super().connect()
127
128    def create_connection(self, *pos, **kw):
129        return FakeSocket(*self.fake_socket_args)
130
131class HeaderTests(TestCase):
132    def test_auto_headers(self):
133        # Some headers are added automatically, but should not be added by
134        # .request() if they are explicitly set.
135
136        class HeaderCountingBuffer(list):
137            def __init__(self):
138                self.count = {}
139            def append(self, item):
140                kv = item.split(b':')
141                if len(kv) > 1:
142                    # item is a 'Key: Value' header string
143                    lcKey = kv[0].decode('ascii').lower()
144                    self.count.setdefault(lcKey, 0)
145                    self.count[lcKey] += 1
146                list.append(self, item)
147
148        for explicit_header in True, False:
149            for header in 'Content-length', 'Host', 'Accept-encoding':
150                conn = client.HTTPConnection('example.com')
151                conn.sock = FakeSocket('blahblahblah')
152                conn._buffer = HeaderCountingBuffer()
153
154                body = 'spamspamspam'
155                headers = {}
156                if explicit_header:
157                    headers[header] = str(len(body))
158                conn.request('POST', '/', body, headers)
159                self.assertEqual(conn._buffer.count[header.lower()], 1)
160
161    def test_content_length_0(self):
162
163        class ContentLengthChecker(list):
164            def __init__(self):
165                list.__init__(self)
166                self.content_length = None
167            def append(self, item):
168                kv = item.split(b':', 1)
169                if len(kv) > 1 and kv[0].lower() == b'content-length':
170                    self.content_length = kv[1].strip()
171                list.append(self, item)
172
173        # Here, we're testing that methods expecting a body get a
174        # content-length set to zero if the body is empty (either None or '')
175        bodies = (None, '')
176        methods_with_body = ('PUT', 'POST', 'PATCH')
177        for method, body in itertools.product(methods_with_body, bodies):
178            conn = client.HTTPConnection('example.com')
179            conn.sock = FakeSocket(None)
180            conn._buffer = ContentLengthChecker()
181            conn.request(method, '/', body)
182            self.assertEqual(
183                conn._buffer.content_length, b'0',
184                'Header Content-Length incorrect on {}'.format(method)
185            )
186
187        # For these methods, we make sure that content-length is not set when
188        # the body is None because it might cause unexpected behaviour on the
189        # server.
190        methods_without_body = (
191             'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE',
192        )
193        for method in methods_without_body:
194            conn = client.HTTPConnection('example.com')
195            conn.sock = FakeSocket(None)
196            conn._buffer = ContentLengthChecker()
197            conn.request(method, '/', None)
198            self.assertEqual(
199                conn._buffer.content_length, None,
200                'Header Content-Length set for empty body on {}'.format(method)
201            )
202
203        # If the body is set to '', that's considered to be "present but
204        # empty" rather than "missing", so content length would be set, even
205        # for methods that don't expect a body.
206        for method in methods_without_body:
207            conn = client.HTTPConnection('example.com')
208            conn.sock = FakeSocket(None)
209            conn._buffer = ContentLengthChecker()
210            conn.request(method, '/', '')
211            self.assertEqual(
212                conn._buffer.content_length, b'0',
213                'Header Content-Length incorrect on {}'.format(method)
214            )
215
216        # If the body is set, make sure Content-Length is set.
217        for method in itertools.chain(methods_without_body, methods_with_body):
218            conn = client.HTTPConnection('example.com')
219            conn.sock = FakeSocket(None)
220            conn._buffer = ContentLengthChecker()
221            conn.request(method, '/', ' ')
222            self.assertEqual(
223                conn._buffer.content_length, b'1',
224                'Header Content-Length incorrect on {}'.format(method)
225            )
226
227    def test_putheader(self):
228        conn = client.HTTPConnection('example.com')
229        conn.sock = FakeSocket(None)
230        conn.putrequest('GET','/')
231        conn.putheader('Content-length', 42)
232        self.assertIn(b'Content-length: 42', conn._buffer)
233
234        conn.putheader('Foo', ' bar ')
235        self.assertIn(b'Foo:  bar ', conn._buffer)
236        conn.putheader('Bar', '\tbaz\t')
237        self.assertIn(b'Bar: \tbaz\t', conn._buffer)
238        conn.putheader('Authorization', 'Bearer mytoken')
239        self.assertIn(b'Authorization: Bearer mytoken', conn._buffer)
240        conn.putheader('IterHeader', 'IterA', 'IterB')
241        self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer)
242        conn.putheader('LatinHeader', b'\xFF')
243        self.assertIn(b'LatinHeader: \xFF', conn._buffer)
244        conn.putheader('Utf8Header', b'\xc3\x80')
245        self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer)
246        conn.putheader('C1-Control', b'next\x85line')
247        self.assertIn(b'C1-Control: next\x85line', conn._buffer)
248        conn.putheader('Embedded-Fold-Space', 'is\r\n allowed')
249        self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer)
250        conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed')
251        self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer)
252        conn.putheader('Key Space', 'value')
253        self.assertIn(b'Key Space: value', conn._buffer)
254        conn.putheader('KeySpace ', 'value')
255        self.assertIn(b'KeySpace : value', conn._buffer)
256        conn.putheader(b'Nonbreak\xa0Space', 'value')
257        self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer)
258        conn.putheader(b'\xa0NonbreakSpace', 'value')
259        self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer)
260
261    def test_ipv6host_header(self):
262        # Default host header on IPv6 transaction should be wrapped by [] if
263        # it is an IPv6 address
264        expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
265                   b'Accept-Encoding: identity\r\n\r\n'
266        conn = client.HTTPConnection('[2001::]:81')
267        sock = FakeSocket('')
268        conn.sock = sock
269        conn.request('GET', '/foo')
270        self.assertTrue(sock.data.startswith(expected))
271
272        expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
273                   b'Accept-Encoding: identity\r\n\r\n'
274        conn = client.HTTPConnection('[2001:102A::]')
275        sock = FakeSocket('')
276        conn.sock = sock
277        conn.request('GET', '/foo')
278        self.assertTrue(sock.data.startswith(expected))
279
280    def test_malformed_headers_coped_with(self):
281        # Issue 19996
282        body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n"
283        sock = FakeSocket(body)
284        resp = client.HTTPResponse(sock)
285        resp.begin()
286
287        self.assertEqual(resp.getheader('First'), 'val')
288        self.assertEqual(resp.getheader('Second'), 'val')
289
290    def test_parse_all_octets(self):
291        # Ensure no valid header field octet breaks the parser
292        body = (
293            b'HTTP/1.1 200 OK\r\n'
294            b"!#$%&'*+-.^_`|~: value\r\n"  # Special token characters
295            b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n'
296            b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n'
297            b'obs-fold: text\r\n'
298            b' folded with space\r\n'
299            b'\tfolded with tab\r\n'
300            b'Content-Length: 0\r\n'
301            b'\r\n'
302        )
303        sock = FakeSocket(body)
304        resp = client.HTTPResponse(sock)
305        resp.begin()
306        self.assertEqual(resp.getheader('Content-Length'), '0')
307        self.assertEqual(resp.msg['Content-Length'], '0')
308        self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value')
309        self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value')
310        vchar = ''.join(map(chr, range(0x21, 0x7E + 1)))
311        self.assertEqual(resp.getheader('VCHAR'), vchar)
312        self.assertEqual(resp.msg['VCHAR'], vchar)
313        self.assertIsNotNone(resp.getheader('obs-text'))
314        self.assertIn('obs-text', resp.msg)
315        for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']):
316            self.assertTrue(folded.startswith('text'))
317            self.assertIn(' folded with space', folded)
318            self.assertTrue(folded.endswith('folded with tab'))
319
320    def test_invalid_headers(self):
321        conn = client.HTTPConnection('example.com')
322        conn.sock = FakeSocket('')
323        conn.putrequest('GET', '/')
324
325        # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
326        # longer allowed in header names
327        cases = (
328            (b'Invalid\r\nName', b'ValidValue'),
329            (b'Invalid\rName', b'ValidValue'),
330            (b'Invalid\nName', b'ValidValue'),
331            (b'\r\nInvalidName', b'ValidValue'),
332            (b'\rInvalidName', b'ValidValue'),
333            (b'\nInvalidName', b'ValidValue'),
334            (b' InvalidName', b'ValidValue'),
335            (b'\tInvalidName', b'ValidValue'),
336            (b'Invalid:Name', b'ValidValue'),
337            (b':InvalidName', b'ValidValue'),
338            (b'ValidName', b'Invalid\r\nValue'),
339            (b'ValidName', b'Invalid\rValue'),
340            (b'ValidName', b'Invalid\nValue'),
341            (b'ValidName', b'InvalidValue\r\n'),
342            (b'ValidName', b'InvalidValue\r'),
343            (b'ValidName', b'InvalidValue\n'),
344        )
345        for name, value in cases:
346            with self.subTest((name, value)):
347                with self.assertRaisesRegex(ValueError, 'Invalid header'):
348                    conn.putheader(name, value)
349
350    def test_headers_debuglevel(self):
351        body = (
352            b'HTTP/1.1 200 OK\r\n'
353            b'First: val\r\n'
354            b'Second: val1\r\n'
355            b'Second: val2\r\n'
356        )
357        sock = FakeSocket(body)
358        resp = client.HTTPResponse(sock, debuglevel=1)
359        with support.captured_stdout() as output:
360            resp.begin()
361        lines = output.getvalue().splitlines()
362        self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'")
363        self.assertEqual(lines[1], "header: First: val")
364        self.assertEqual(lines[2], "header: Second: val1")
365        self.assertEqual(lines[3], "header: Second: val2")
366
367
368class HttpMethodTests(TestCase):
369    def test_invalid_method_names(self):
370        methods = (
371            'GET\r',
372            'POST\n',
373            'PUT\n\r',
374            'POST\nValue',
375            'POST\nHOST:abc',
376            'GET\nrHost:abc\n',
377            'POST\rRemainder:\r',
378            'GET\rHOST:\n',
379            '\nPUT'
380        )
381
382        for method in methods:
383            with self.assertRaisesRegex(
384                    ValueError, "method can't contain control characters"):
385                conn = client.HTTPConnection('example.com')
386                conn.sock = FakeSocket(None)
387                conn.request(method=method, url="/")
388
389
390class TransferEncodingTest(TestCase):
391    expected_body = b"It's just a flesh wound"
392
393    def test_endheaders_chunked(self):
394        conn = client.HTTPConnection('example.com')
395        conn.sock = FakeSocket(b'')
396        conn.putrequest('POST', '/')
397        conn.endheaders(self._make_body(), encode_chunked=True)
398
399        _, _, body = self._parse_request(conn.sock.data)
400        body = self._parse_chunked(body)
401        self.assertEqual(body, self.expected_body)
402
403    def test_explicit_headers(self):
404        # explicit chunked
405        conn = client.HTTPConnection('example.com')
406        conn.sock = FakeSocket(b'')
407        # this shouldn't actually be automatically chunk-encoded because the
408        # calling code has explicitly stated that it's taking care of it
409        conn.request(
410            'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'})
411
412        _, headers, body = self._parse_request(conn.sock.data)
413        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
414        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
415        self.assertEqual(body, self.expected_body)
416
417        # explicit chunked, string body
418        conn = client.HTTPConnection('example.com')
419        conn.sock = FakeSocket(b'')
420        conn.request(
421            'POST', '/', self.expected_body.decode('latin-1'),
422            {'Transfer-Encoding': 'chunked'})
423
424        _, headers, body = self._parse_request(conn.sock.data)
425        self.assertNotIn('content-length', [k.lower() for k in headers.keys()])
426        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
427        self.assertEqual(body, self.expected_body)
428
429        # User-specified TE, but request() does the chunk encoding
430        conn = client.HTTPConnection('example.com')
431        conn.sock = FakeSocket(b'')
432        conn.request('POST', '/',
433            headers={'Transfer-Encoding': 'gzip, chunked'},
434            encode_chunked=True,
435            body=self._make_body())
436        _, headers, body = self._parse_request(conn.sock.data)
437        self.assertNotIn('content-length', [k.lower() for k in headers])
438        self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked')
439        self.assertEqual(self._parse_chunked(body), self.expected_body)
440
441    def test_request(self):
442        for empty_lines in (False, True,):
443            conn = client.HTTPConnection('example.com')
444            conn.sock = FakeSocket(b'')
445            conn.request(
446                'POST', '/', self._make_body(empty_lines=empty_lines))
447
448            _, headers, body = self._parse_request(conn.sock.data)
449            body = self._parse_chunked(body)
450            self.assertEqual(body, self.expected_body)
451            self.assertEqual(headers['Transfer-Encoding'], 'chunked')
452
453            # Content-Length and Transfer-Encoding SHOULD not be sent in the
454            # same request
455            self.assertNotIn('content-length', [k.lower() for k in headers])
456
457    def test_empty_body(self):
458        # Zero-length iterable should be treated like any other iterable
459        conn = client.HTTPConnection('example.com')
460        conn.sock = FakeSocket(b'')
461        conn.request('POST', '/', ())
462        _, headers, body = self._parse_request(conn.sock.data)
463        self.assertEqual(headers['Transfer-Encoding'], 'chunked')
464        self.assertNotIn('content-length', [k.lower() for k in headers])
465        self.assertEqual(body, b"0\r\n\r\n")
466
467    def _make_body(self, empty_lines=False):
468        lines = self.expected_body.split(b' ')
469        for idx, line in enumerate(lines):
470            # for testing handling empty lines
471            if empty_lines and idx % 2:
472                yield b''
473            if idx < len(lines) - 1:
474                yield line + b' '
475            else:
476                yield line
477
478    def _parse_request(self, data):
479        lines = data.split(b'\r\n')
480        request = lines[0]
481        headers = {}
482        n = 1
483        while n < len(lines) and len(lines[n]) > 0:
484            key, val = lines[n].split(b':')
485            key = key.decode('latin-1').strip()
486            headers[key] = val.decode('latin-1').strip()
487            n += 1
488
489        return request, headers, b'\r\n'.join(lines[n + 1:])
490
491    def _parse_chunked(self, data):
492        body = []
493        trailers = {}
494        n = 0
495        lines = data.split(b'\r\n')
496        # parse body
497        while True:
498            size, chunk = lines[n:n+2]
499            size = int(size, 16)
500
501            if size == 0:
502                n += 1
503                break
504
505            self.assertEqual(size, len(chunk))
506            body.append(chunk)
507
508            n += 2
509            # we /should/ hit the end chunk, but check against the size of
510            # lines so we're not stuck in an infinite loop should we get
511            # malformed data
512            if n > len(lines):
513                break
514
515        return b''.join(body)
516
517
518class BasicTest(TestCase):
519    def test_status_lines(self):
520        # Test HTTP status lines
521
522        body = "HTTP/1.1 200 Ok\r\n\r\nText"
523        sock = FakeSocket(body)
524        resp = client.HTTPResponse(sock)
525        resp.begin()
526        self.assertEqual(resp.read(0), b'')  # Issue #20007
527        self.assertFalse(resp.isclosed())
528        self.assertFalse(resp.closed)
529        self.assertEqual(resp.read(), b"Text")
530        self.assertTrue(resp.isclosed())
531        self.assertFalse(resp.closed)
532        resp.close()
533        self.assertTrue(resp.closed)
534
535        body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
536        sock = FakeSocket(body)
537        resp = client.HTTPResponse(sock)
538        self.assertRaises(client.BadStatusLine, resp.begin)
539
540    def test_bad_status_repr(self):
541        exc = client.BadStatusLine('')
542        self.assertEqual(repr(exc), '''BadStatusLine("''")''')
543
544    def test_partial_reads(self):
545        # if we have Content-Length, HTTPResponse knows when to close itself,
546        # the same behaviour as when we read the whole thing with read()
547        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
548        sock = FakeSocket(body)
549        resp = client.HTTPResponse(sock)
550        resp.begin()
551        self.assertEqual(resp.read(2), b'Te')
552        self.assertFalse(resp.isclosed())
553        self.assertEqual(resp.read(2), b'xt')
554        self.assertTrue(resp.isclosed())
555        self.assertFalse(resp.closed)
556        resp.close()
557        self.assertTrue(resp.closed)
558
559    def test_mixed_reads(self):
560        # readline() should update the remaining length, so that read() knows
561        # how much data is left and does not raise IncompleteRead
562        body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother"
563        sock = FakeSocket(body)
564        resp = client.HTTPResponse(sock)
565        resp.begin()
566        self.assertEqual(resp.readline(), b'Text\r\n')
567        self.assertFalse(resp.isclosed())
568        self.assertEqual(resp.read(), b'Another')
569        self.assertTrue(resp.isclosed())
570        self.assertFalse(resp.closed)
571        resp.close()
572        self.assertTrue(resp.closed)
573
574    def test_partial_readintos(self):
575        # if we have Content-Length, HTTPResponse knows when to close itself,
576        # the same behaviour as when we read the whole thing with read()
577        body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
578        sock = FakeSocket(body)
579        resp = client.HTTPResponse(sock)
580        resp.begin()
581        b = bytearray(2)
582        n = resp.readinto(b)
583        self.assertEqual(n, 2)
584        self.assertEqual(bytes(b), b'Te')
585        self.assertFalse(resp.isclosed())
586        n = resp.readinto(b)
587        self.assertEqual(n, 2)
588        self.assertEqual(bytes(b), b'xt')
589        self.assertTrue(resp.isclosed())
590        self.assertFalse(resp.closed)
591        resp.close()
592        self.assertTrue(resp.closed)
593
594    def test_partial_reads_no_content_length(self):
595        # when no length is present, the socket should be gracefully closed when
596        # all data was read
597        body = "HTTP/1.1 200 Ok\r\n\r\nText"
598        sock = FakeSocket(body)
599        resp = client.HTTPResponse(sock)
600        resp.begin()
601        self.assertEqual(resp.read(2), b'Te')
602        self.assertFalse(resp.isclosed())
603        self.assertEqual(resp.read(2), b'xt')
604        self.assertEqual(resp.read(1), b'')
605        self.assertTrue(resp.isclosed())
606        self.assertFalse(resp.closed)
607        resp.close()
608        self.assertTrue(resp.closed)
609
610    def test_partial_readintos_no_content_length(self):
611        # when no length is present, the socket should be gracefully closed when
612        # all data was read
613        body = "HTTP/1.1 200 Ok\r\n\r\nText"
614        sock = FakeSocket(body)
615        resp = client.HTTPResponse(sock)
616        resp.begin()
617        b = bytearray(2)
618        n = resp.readinto(b)
619        self.assertEqual(n, 2)
620        self.assertEqual(bytes(b), b'Te')
621        self.assertFalse(resp.isclosed())
622        n = resp.readinto(b)
623        self.assertEqual(n, 2)
624        self.assertEqual(bytes(b), b'xt')
625        n = resp.readinto(b)
626        self.assertEqual(n, 0)
627        self.assertTrue(resp.isclosed())
628
629    def test_partial_reads_incomplete_body(self):
630        # if the server shuts down the connection before the whole
631        # content-length is delivered, the socket is gracefully closed
632        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
633        sock = FakeSocket(body)
634        resp = client.HTTPResponse(sock)
635        resp.begin()
636        self.assertEqual(resp.read(2), b'Te')
637        self.assertFalse(resp.isclosed())
638        self.assertEqual(resp.read(2), b'xt')
639        self.assertEqual(resp.read(1), b'')
640        self.assertTrue(resp.isclosed())
641
642    def test_partial_readintos_incomplete_body(self):
643        # if the server shuts down the connection before the whole
644        # content-length is delivered, the socket is gracefully closed
645        body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
646        sock = FakeSocket(body)
647        resp = client.HTTPResponse(sock)
648        resp.begin()
649        b = bytearray(2)
650        n = resp.readinto(b)
651        self.assertEqual(n, 2)
652        self.assertEqual(bytes(b), b'Te')
653        self.assertFalse(resp.isclosed())
654        n = resp.readinto(b)
655        self.assertEqual(n, 2)
656        self.assertEqual(bytes(b), b'xt')
657        n = resp.readinto(b)
658        self.assertEqual(n, 0)
659        self.assertTrue(resp.isclosed())
660        self.assertFalse(resp.closed)
661        resp.close()
662        self.assertTrue(resp.closed)
663
664    def test_host_port(self):
665        # Check invalid host_port
666
667        for hp in ("www.python.org:abc", "user:password@www.python.org"):
668            self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
669
670        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
671                          "fe80::207:e9ff:fe9b", 8000),
672                         ("www.python.org:80", "www.python.org", 80),
673                         ("www.python.org:", "www.python.org", 80),
674                         ("www.python.org", "www.python.org", 80),
675                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80),
676                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)):
677            c = client.HTTPConnection(hp)
678            self.assertEqual(h, c.host)
679            self.assertEqual(p, c.port)
680
681    def test_response_headers(self):
682        # test response with multiple message headers with the same field name.
683        text = ('HTTP/1.1 200 OK\r\n'
684                'Set-Cookie: Customer="WILE_E_COYOTE"; '
685                'Version="1"; Path="/acme"\r\n'
686                'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
687                ' Path="/acme"\r\n'
688                '\r\n'
689                'No body\r\n')
690        hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
691               ', '
692               'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
693        s = FakeSocket(text)
694        r = client.HTTPResponse(s)
695        r.begin()
696        cookies = r.getheader("Set-Cookie")
697        self.assertEqual(cookies, hdr)
698
699    def test_read_head(self):
700        # Test that the library doesn't attempt to read any data
701        # from a HEAD request.  (Tickles SF bug #622042.)
702        sock = FakeSocket(
703            'HTTP/1.1 200 OK\r\n'
704            'Content-Length: 14432\r\n'
705            '\r\n',
706            NoEOFBytesIO)
707        resp = client.HTTPResponse(sock, method="HEAD")
708        resp.begin()
709        if resp.read():
710            self.fail("Did not expect response from HEAD request")
711
712    def test_readinto_head(self):
713        # Test that the library doesn't attempt to read any data
714        # from a HEAD request.  (Tickles SF bug #622042.)
715        sock = FakeSocket(
716            'HTTP/1.1 200 OK\r\n'
717            'Content-Length: 14432\r\n'
718            '\r\n',
719            NoEOFBytesIO)
720        resp = client.HTTPResponse(sock, method="HEAD")
721        resp.begin()
722        b = bytearray(5)
723        if resp.readinto(b) != 0:
724            self.fail("Did not expect response from HEAD request")
725        self.assertEqual(bytes(b), b'\x00'*5)
726
727    def test_too_many_headers(self):
728        headers = '\r\n'.join('Header%d: foo' % i
729                              for i in range(client._MAXHEADERS + 1)) + '\r\n'
730        text = ('HTTP/1.1 200 OK\r\n' + headers)
731        s = FakeSocket(text)
732        r = client.HTTPResponse(s)
733        self.assertRaisesRegex(client.HTTPException,
734                               r"got more than \d+ headers", r.begin)
735
736    def test_send_file(self):
737        expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
738                    b'Accept-Encoding: identity\r\n'
739                    b'Transfer-Encoding: chunked\r\n'
740                    b'\r\n')
741
742        with open(__file__, 'rb') as body:
743            conn = client.HTTPConnection('example.com')
744            sock = FakeSocket(body)
745            conn.sock = sock
746            conn.request('GET', '/foo', body)
747            self.assertTrue(sock.data.startswith(expected), '%r != %r' %
748                    (sock.data[:len(expected)], expected))
749
750    def test_send(self):
751        expected = b'this is a test this is only a test'
752        conn = client.HTTPConnection('example.com')
753        sock = FakeSocket(None)
754        conn.sock = sock
755        conn.send(expected)
756        self.assertEqual(expected, sock.data)
757        sock.data = b''
758        conn.send(array.array('b', expected))
759        self.assertEqual(expected, sock.data)
760        sock.data = b''
761        conn.send(io.BytesIO(expected))
762        self.assertEqual(expected, sock.data)
763
764    def test_send_updating_file(self):
765        def data():
766            yield 'data'
767            yield None
768            yield 'data_two'
769
770        class UpdatingFile(io.TextIOBase):
771            mode = 'r'
772            d = data()
773            def read(self, blocksize=-1):
774                return next(self.d)
775
776        expected = b'data'
777
778        conn = client.HTTPConnection('example.com')
779        sock = FakeSocket("")
780        conn.sock = sock
781        conn.send(UpdatingFile())
782        self.assertEqual(sock.data, expected)
783
784
785    def test_send_iter(self):
786        expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \
787                   b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \
788                   b'\r\nonetwothree'
789
790        def body():
791            yield b"one"
792            yield b"two"
793            yield b"three"
794
795        conn = client.HTTPConnection('example.com')
796        sock = FakeSocket("")
797        conn.sock = sock
798        conn.request('GET', '/foo', body(), {'Content-Length': '11'})
799        self.assertEqual(sock.data, expected)
800
801    def test_blocksize_request(self):
802        """Check that request() respects the configured block size."""
803        blocksize = 8  # For easy debugging.
804        conn = client.HTTPConnection('example.com', blocksize=blocksize)
805        sock = FakeSocket(None)
806        conn.sock = sock
807        expected = b"a" * blocksize + b"b"
808        conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"})
809        self.assertEqual(sock.sendall_calls, 3)
810        body = sock.data.split(b"\r\n\r\n", 1)[1]
811        self.assertEqual(body, expected)
812
813    def test_blocksize_send(self):
814        """Check that send() respects the configured block size."""
815        blocksize = 8  # For easy debugging.
816        conn = client.HTTPConnection('example.com', blocksize=blocksize)
817        sock = FakeSocket(None)
818        conn.sock = sock
819        expected = b"a" * blocksize + b"b"
820        conn.send(io.BytesIO(expected))
821        self.assertEqual(sock.sendall_calls, 2)
822        self.assertEqual(sock.data, expected)
823
824    def test_send_type_error(self):
825        # See: Issue #12676
826        conn = client.HTTPConnection('example.com')
827        conn.sock = FakeSocket('')
828        with self.assertRaises(TypeError):
829            conn.request('POST', 'test', conn)
830
831    def test_chunked(self):
832        expected = chunked_expected
833        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
834        resp = client.HTTPResponse(sock, method="GET")
835        resp.begin()
836        self.assertEqual(resp.read(), expected)
837        resp.close()
838
839        # Various read sizes
840        for n in range(1, 12):
841            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
842            resp = client.HTTPResponse(sock, method="GET")
843            resp.begin()
844            self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected)
845            resp.close()
846
847        for x in ('', 'foo\r\n'):
848            sock = FakeSocket(chunked_start + x)
849            resp = client.HTTPResponse(sock, method="GET")
850            resp.begin()
851            try:
852                resp.read()
853            except client.IncompleteRead as i:
854                self.assertEqual(i.partial, expected)
855                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
856                self.assertEqual(repr(i), expected_message)
857                self.assertEqual(str(i), expected_message)
858            else:
859                self.fail('IncompleteRead expected')
860            finally:
861                resp.close()
862
863    def test_readinto_chunked(self):
864
865        expected = chunked_expected
866        nexpected = len(expected)
867        b = bytearray(128)
868
869        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
870        resp = client.HTTPResponse(sock, method="GET")
871        resp.begin()
872        n = resp.readinto(b)
873        self.assertEqual(b[:nexpected], expected)
874        self.assertEqual(n, nexpected)
875        resp.close()
876
877        # Various read sizes
878        for n in range(1, 12):
879            sock = FakeSocket(chunked_start + last_chunk + chunked_end)
880            resp = client.HTTPResponse(sock, method="GET")
881            resp.begin()
882            m = memoryview(b)
883            i = resp.readinto(m[0:n])
884            i += resp.readinto(m[i:n + i])
885            i += resp.readinto(m[i:])
886            self.assertEqual(b[:nexpected], expected)
887            self.assertEqual(i, nexpected)
888            resp.close()
889
890        for x in ('', 'foo\r\n'):
891            sock = FakeSocket(chunked_start + x)
892            resp = client.HTTPResponse(sock, method="GET")
893            resp.begin()
894            try:
895                n = resp.readinto(b)
896            except client.IncompleteRead as i:
897                self.assertEqual(i.partial, expected)
898                expected_message = 'IncompleteRead(%d bytes read)' % len(expected)
899                self.assertEqual(repr(i), expected_message)
900                self.assertEqual(str(i), expected_message)
901            else:
902                self.fail('IncompleteRead expected')
903            finally:
904                resp.close()
905
906    def test_chunked_head(self):
907        chunked_start = (
908            'HTTP/1.1 200 OK\r\n'
909            'Transfer-Encoding: chunked\r\n\r\n'
910            'a\r\n'
911            'hello world\r\n'
912            '1\r\n'
913            'd\r\n'
914        )
915        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
916        resp = client.HTTPResponse(sock, method="HEAD")
917        resp.begin()
918        self.assertEqual(resp.read(), b'')
919        self.assertEqual(resp.status, 200)
920        self.assertEqual(resp.reason, 'OK')
921        self.assertTrue(resp.isclosed())
922        self.assertFalse(resp.closed)
923        resp.close()
924        self.assertTrue(resp.closed)
925
926    def test_readinto_chunked_head(self):
927        chunked_start = (
928            'HTTP/1.1 200 OK\r\n'
929            'Transfer-Encoding: chunked\r\n\r\n'
930            'a\r\n'
931            'hello world\r\n'
932            '1\r\n'
933            'd\r\n'
934        )
935        sock = FakeSocket(chunked_start + last_chunk + chunked_end)
936        resp = client.HTTPResponse(sock, method="HEAD")
937        resp.begin()
938        b = bytearray(5)
939        n = resp.readinto(b)
940        self.assertEqual(n, 0)
941        self.assertEqual(bytes(b), b'\x00'*5)
942        self.assertEqual(resp.status, 200)
943        self.assertEqual(resp.reason, 'OK')
944        self.assertTrue(resp.isclosed())
945        self.assertFalse(resp.closed)
946        resp.close()
947        self.assertTrue(resp.closed)
948
949    def test_negative_content_length(self):
950        sock = FakeSocket(
951            'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
952        resp = client.HTTPResponse(sock, method="GET")
953        resp.begin()
954        self.assertEqual(resp.read(), b'Hello\r\n')
955        self.assertTrue(resp.isclosed())
956
957    def test_incomplete_read(self):
958        sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
959        resp = client.HTTPResponse(sock, method="GET")
960        resp.begin()
961        try:
962            resp.read()
963        except client.IncompleteRead as i:
964            self.assertEqual(i.partial, b'Hello\r\n')
965            self.assertEqual(repr(i),
966                             "IncompleteRead(7 bytes read, 3 more expected)")
967            self.assertEqual(str(i),
968                             "IncompleteRead(7 bytes read, 3 more expected)")
969            self.assertTrue(resp.isclosed())
970        else:
971            self.fail('IncompleteRead expected')
972
973    def test_epipe(self):
974        sock = EPipeSocket(
975            "HTTP/1.0 401 Authorization Required\r\n"
976            "Content-type: text/html\r\n"
977            "WWW-Authenticate: Basic realm=\"example\"\r\n",
978            b"Content-Length")
979        conn = client.HTTPConnection("example.com")
980        conn.sock = sock
981        self.assertRaises(OSError,
982                          lambda: conn.request("PUT", "/url", "body"))
983        resp = conn.getresponse()
984        self.assertEqual(401, resp.status)
985        self.assertEqual("Basic realm=\"example\"",
986                         resp.getheader("www-authenticate"))
987
988    # Test lines overflowing the max line size (_MAXLINE in http.client)
989
990    def test_overflowing_status_line(self):
991        body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n"
992        resp = client.HTTPResponse(FakeSocket(body))
993        self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin)
994
995    def test_overflowing_header_line(self):
996        body = (
997            'HTTP/1.1 200 OK\r\n'
998            'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n'
999        )
1000        resp = client.HTTPResponse(FakeSocket(body))
1001        self.assertRaises(client.LineTooLong, resp.begin)
1002
1003    def test_overflowing_chunked_line(self):
1004        body = (
1005            'HTTP/1.1 200 OK\r\n'
1006            'Transfer-Encoding: chunked\r\n\r\n'
1007            + '0' * 65536 + 'a\r\n'
1008            'hello world\r\n'
1009            '0\r\n'
1010            '\r\n'
1011        )
1012        resp = client.HTTPResponse(FakeSocket(body))
1013        resp.begin()
1014        self.assertRaises(client.LineTooLong, resp.read)
1015
1016    def test_early_eof(self):
1017        # Test httpresponse with no \r\n termination,
1018        body = "HTTP/1.1 200 Ok"
1019        sock = FakeSocket(body)
1020        resp = client.HTTPResponse(sock)
1021        resp.begin()
1022        self.assertEqual(resp.read(), b'')
1023        self.assertTrue(resp.isclosed())
1024        self.assertFalse(resp.closed)
1025        resp.close()
1026        self.assertTrue(resp.closed)
1027
1028    def test_error_leak(self):
1029        # Test that the socket is not leaked if getresponse() fails
1030        conn = client.HTTPConnection('example.com')
1031        response = None
1032        class Response(client.HTTPResponse):
1033            def __init__(self, *pos, **kw):
1034                nonlocal response
1035                response = self  # Avoid garbage collector closing the socket
1036                client.HTTPResponse.__init__(self, *pos, **kw)
1037        conn.response_class = Response
1038        conn.sock = FakeSocket('Invalid status line')
1039        conn.request('GET', '/')
1040        self.assertRaises(client.BadStatusLine, conn.getresponse)
1041        self.assertTrue(response.closed)
1042        self.assertTrue(conn.sock.file_closed)
1043
1044    def test_chunked_extension(self):
1045        extra = '3;foo=bar\r\n' + 'abc\r\n'
1046        expected = chunked_expected + b'abc'
1047
1048        sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end)
1049        resp = client.HTTPResponse(sock, method="GET")
1050        resp.begin()
1051        self.assertEqual(resp.read(), expected)
1052        resp.close()
1053
1054    def test_chunked_missing_end(self):
1055        """some servers may serve up a short chunked encoding stream"""
1056        expected = chunked_expected
1057        sock = FakeSocket(chunked_start + last_chunk)  #no terminating crlf
1058        resp = client.HTTPResponse(sock, method="GET")
1059        resp.begin()
1060        self.assertEqual(resp.read(), expected)
1061        resp.close()
1062
1063    def test_chunked_trailers(self):
1064        """See that trailers are read and ignored"""
1065        expected = chunked_expected
1066        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end)
1067        resp = client.HTTPResponse(sock, method="GET")
1068        resp.begin()
1069        self.assertEqual(resp.read(), expected)
1070        # we should have reached the end of the file
1071        self.assertEqual(sock.file.read(), b"") #we read to the end
1072        resp.close()
1073
1074    def test_chunked_sync(self):
1075        """Check that we don't read past the end of the chunked-encoding stream"""
1076        expected = chunked_expected
1077        extradata = "extradata"
1078        sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata)
1079        resp = client.HTTPResponse(sock, method="GET")
1080        resp.begin()
1081        self.assertEqual(resp.read(), expected)
1082        # the file should now have our extradata ready to be read
1083        self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end
1084        resp.close()
1085
1086    def test_content_length_sync(self):
1087        """Check that we don't read past the end of the Content-Length stream"""
1088        extradata = b"extradata"
1089        expected = b"Hello123\r\n"
1090        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1091        resp = client.HTTPResponse(sock, method="GET")
1092        resp.begin()
1093        self.assertEqual(resp.read(), expected)
1094        # the file should now have our extradata ready to be read
1095        self.assertEqual(sock.file.read(), extradata) #we read to the end
1096        resp.close()
1097
1098    def test_readlines_content_length(self):
1099        extradata = b"extradata"
1100        expected = b"Hello123\r\n"
1101        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1102        resp = client.HTTPResponse(sock, method="GET")
1103        resp.begin()
1104        self.assertEqual(resp.readlines(2000), [expected])
1105        # the file should now have our extradata ready to be read
1106        self.assertEqual(sock.file.read(), extradata) #we read to the end
1107        resp.close()
1108
1109    def test_read1_content_length(self):
1110        extradata = b"extradata"
1111        expected = b"Hello123\r\n"
1112        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1113        resp = client.HTTPResponse(sock, method="GET")
1114        resp.begin()
1115        self.assertEqual(resp.read1(2000), expected)
1116        # the file should now have our extradata ready to be read
1117        self.assertEqual(sock.file.read(), extradata) #we read to the end
1118        resp.close()
1119
1120    def test_readline_bound_content_length(self):
1121        extradata = b"extradata"
1122        expected = b"Hello123\r\n"
1123        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata)
1124        resp = client.HTTPResponse(sock, method="GET")
1125        resp.begin()
1126        self.assertEqual(resp.readline(10), expected)
1127        self.assertEqual(resp.readline(10), b"")
1128        # the file should now have our extradata ready to be read
1129        self.assertEqual(sock.file.read(), extradata) #we read to the end
1130        resp.close()
1131
1132    def test_read1_bound_content_length(self):
1133        extradata = b"extradata"
1134        expected = b"Hello123\r\n"
1135        sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata)
1136        resp = client.HTTPResponse(sock, method="GET")
1137        resp.begin()
1138        self.assertEqual(resp.read1(20), expected*2)
1139        self.assertEqual(resp.read(), expected)
1140        # the file should now have our extradata ready to be read
1141        self.assertEqual(sock.file.read(), extradata) #we read to the end
1142        resp.close()
1143
1144    def test_response_fileno(self):
1145        # Make sure fd returned by fileno is valid.
1146        serv = socket.create_server((HOST, 0))
1147        self.addCleanup(serv.close)
1148
1149        result = None
1150        def run_server():
1151            [conn, address] = serv.accept()
1152            with conn, conn.makefile("rb") as reader:
1153                # Read the request header until a blank line
1154                while True:
1155                    line = reader.readline()
1156                    if not line.rstrip(b"\r\n"):
1157                        break
1158                conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n")
1159                nonlocal result
1160                result = reader.read()
1161
1162        thread = threading.Thread(target=run_server)
1163        thread.start()
1164        self.addCleanup(thread.join, float(1))
1165        conn = client.HTTPConnection(*serv.getsockname())
1166        conn.request("CONNECT", "dummy:1234")
1167        response = conn.getresponse()
1168        try:
1169            self.assertEqual(response.status, client.OK)
1170            s = socket.socket(fileno=response.fileno())
1171            try:
1172                s.sendall(b"proxied data\n")
1173            finally:
1174                s.detach()
1175        finally:
1176            response.close()
1177            conn.close()
1178        thread.join()
1179        self.assertEqual(result, b"proxied data\n")
1180
1181    def test_putrequest_override_domain_validation(self):
1182        """
1183        It should be possible to override the default validation
1184        behavior in putrequest (bpo-38216).
1185        """
1186        class UnsafeHTTPConnection(client.HTTPConnection):
1187            def _validate_path(self, url):
1188                pass
1189
1190        conn = UnsafeHTTPConnection('example.com')
1191        conn.sock = FakeSocket('')
1192        conn.putrequest('GET', '/\x00')
1193
1194    def test_putrequest_override_host_validation(self):
1195        class UnsafeHTTPConnection(client.HTTPConnection):
1196            def _validate_host(self, url):
1197                pass
1198
1199        conn = UnsafeHTTPConnection('example.com\r\n')
1200        conn.sock = FakeSocket('')
1201        # set skip_host so a ValueError is not raised upon adding the
1202        # invalid URL as the value of the "Host:" header
1203        conn.putrequest('GET', '/', skip_host=1)
1204
1205    def test_putrequest_override_encoding(self):
1206        """
1207        It should be possible to override the default encoding
1208        to transmit bytes in another encoding even if invalid
1209        (bpo-36274).
1210        """
1211        class UnsafeHTTPConnection(client.HTTPConnection):
1212            def _encode_request(self, str_url):
1213                return str_url.encode('utf-8')
1214
1215        conn = UnsafeHTTPConnection('example.com')
1216        conn.sock = FakeSocket('')
1217        conn.putrequest('GET', '/☃')
1218
1219
1220class ExtendedReadTest(TestCase):
1221    """
1222    Test peek(), read1(), readline()
1223    """
1224    lines = (
1225        'HTTP/1.1 200 OK\r\n'
1226        '\r\n'
1227        'hello world!\n'
1228        'and now \n'
1229        'for something completely different\n'
1230        'foo'
1231        )
1232    lines_expected = lines[lines.find('hello'):].encode("ascii")
1233    lines_chunked = (
1234        'HTTP/1.1 200 OK\r\n'
1235        'Transfer-Encoding: chunked\r\n\r\n'
1236        'a\r\n'
1237        'hello worl\r\n'
1238        '3\r\n'
1239        'd!\n\r\n'
1240        '9\r\n'
1241        'and now \n\r\n'
1242        '23\r\n'
1243        'for something completely different\n\r\n'
1244        '3\r\n'
1245        'foo\r\n'
1246        '0\r\n' # terminating chunk
1247        '\r\n'  # end of trailers
1248    )
1249
1250    def setUp(self):
1251        sock = FakeSocket(self.lines)
1252        resp = client.HTTPResponse(sock, method="GET")
1253        resp.begin()
1254        resp.fp = io.BufferedReader(resp.fp)
1255        self.resp = resp
1256
1257
1258
1259    def test_peek(self):
1260        resp = self.resp
1261        # patch up the buffered peek so that it returns not too much stuff
1262        oldpeek = resp.fp.peek
1263        def mypeek(n=-1):
1264            p = oldpeek(n)
1265            if n >= 0:
1266                return p[:n]
1267            return p[:10]
1268        resp.fp.peek = mypeek
1269
1270        all = []
1271        while True:
1272            # try a short peek
1273            p = resp.peek(3)
1274            if p:
1275                self.assertGreater(len(p), 0)
1276                # then unbounded peek
1277                p2 = resp.peek()
1278                self.assertGreaterEqual(len(p2), len(p))
1279                self.assertTrue(p2.startswith(p))
1280                next = resp.read(len(p2))
1281                self.assertEqual(next, p2)
1282            else:
1283                next = resp.read()
1284                self.assertFalse(next)
1285            all.append(next)
1286            if not next:
1287                break
1288        self.assertEqual(b"".join(all), self.lines_expected)
1289
1290    def test_readline(self):
1291        resp = self.resp
1292        self._verify_readline(self.resp.readline, self.lines_expected)
1293
1294    def _verify_readline(self, readline, expected):
1295        all = []
1296        while True:
1297            # short readlines
1298            line = readline(5)
1299            if line and line != b"foo":
1300                if len(line) < 5:
1301                    self.assertTrue(line.endswith(b"\n"))
1302            all.append(line)
1303            if not line:
1304                break
1305        self.assertEqual(b"".join(all), expected)
1306
1307    def test_read1(self):
1308        resp = self.resp
1309        def r():
1310            res = resp.read1(4)
1311            self.assertLessEqual(len(res), 4)
1312            return res
1313        readliner = Readliner(r)
1314        self._verify_readline(readliner.readline, self.lines_expected)
1315
1316    def test_read1_unbounded(self):
1317        resp = self.resp
1318        all = []
1319        while True:
1320            data = resp.read1()
1321            if not data:
1322                break
1323            all.append(data)
1324        self.assertEqual(b"".join(all), self.lines_expected)
1325
1326    def test_read1_bounded(self):
1327        resp = self.resp
1328        all = []
1329        while True:
1330            data = resp.read1(10)
1331            if not data:
1332                break
1333            self.assertLessEqual(len(data), 10)
1334            all.append(data)
1335        self.assertEqual(b"".join(all), self.lines_expected)
1336
1337    def test_read1_0(self):
1338        self.assertEqual(self.resp.read1(0), b"")
1339
1340    def test_peek_0(self):
1341        p = self.resp.peek(0)
1342        self.assertLessEqual(0, len(p))
1343
1344
1345class ExtendedReadTestChunked(ExtendedReadTest):
1346    """
1347    Test peek(), read1(), readline() in chunked mode
1348    """
1349    lines = (
1350        'HTTP/1.1 200 OK\r\n'
1351        'Transfer-Encoding: chunked\r\n\r\n'
1352        'a\r\n'
1353        'hello worl\r\n'
1354        '3\r\n'
1355        'd!\n\r\n'
1356        '9\r\n'
1357        'and now \n\r\n'
1358        '23\r\n'
1359        'for something completely different\n\r\n'
1360        '3\r\n'
1361        'foo\r\n'
1362        '0\r\n' # terminating chunk
1363        '\r\n'  # end of trailers
1364    )
1365
1366
1367class Readliner:
1368    """
1369    a simple readline class that uses an arbitrary read function and buffering
1370    """
1371    def __init__(self, readfunc):
1372        self.readfunc = readfunc
1373        self.remainder = b""
1374
1375    def readline(self, limit):
1376        data = []
1377        datalen = 0
1378        read = self.remainder
1379        try:
1380            while True:
1381                idx = read.find(b'\n')
1382                if idx != -1:
1383                    break
1384                if datalen + len(read) >= limit:
1385                    idx = limit - datalen - 1
1386                # read more data
1387                data.append(read)
1388                read = self.readfunc()
1389                if not read:
1390                    idx = 0 #eof condition
1391                    break
1392            idx += 1
1393            data.append(read[:idx])
1394            self.remainder = read[idx:]
1395            return b"".join(data)
1396        except:
1397            self.remainder = b"".join(data)
1398            raise
1399
1400
1401class OfflineTest(TestCase):
1402    def test_all(self):
1403        # Documented objects defined in the module should be in __all__
1404        expected = {"responses"}  # White-list documented dict() object
1405        # HTTPMessage, parse_headers(), and the HTTP status code constants are
1406        # intentionally omitted for simplicity
1407        blacklist = {"HTTPMessage", "parse_headers"}
1408        for name in dir(client):
1409            if name.startswith("_") or name in blacklist:
1410                continue
1411            module_object = getattr(client, name)
1412            if getattr(module_object, "__module__", None) == "http.client":
1413                expected.add(name)
1414        self.assertCountEqual(client.__all__, expected)
1415
1416    def test_responses(self):
1417        self.assertEqual(client.responses[client.NOT_FOUND], "Not Found")
1418
1419    def test_client_constants(self):
1420        # Make sure we don't break backward compatibility with 3.4
1421        expected = [
1422            'CONTINUE',
1423            'SWITCHING_PROTOCOLS',
1424            'PROCESSING',
1425            'OK',
1426            'CREATED',
1427            'ACCEPTED',
1428            'NON_AUTHORITATIVE_INFORMATION',
1429            'NO_CONTENT',
1430            'RESET_CONTENT',
1431            'PARTIAL_CONTENT',
1432            'MULTI_STATUS',
1433            'IM_USED',
1434            'MULTIPLE_CHOICES',
1435            'MOVED_PERMANENTLY',
1436            'FOUND',
1437            'SEE_OTHER',
1438            'NOT_MODIFIED',
1439            'USE_PROXY',
1440            'TEMPORARY_REDIRECT',
1441            'BAD_REQUEST',
1442            'UNAUTHORIZED',
1443            'PAYMENT_REQUIRED',
1444            'FORBIDDEN',
1445            'NOT_FOUND',
1446            'METHOD_NOT_ALLOWED',
1447            'NOT_ACCEPTABLE',
1448            'PROXY_AUTHENTICATION_REQUIRED',
1449            'REQUEST_TIMEOUT',
1450            'CONFLICT',
1451            'GONE',
1452            'LENGTH_REQUIRED',
1453            'PRECONDITION_FAILED',
1454            'REQUEST_ENTITY_TOO_LARGE',
1455            'REQUEST_URI_TOO_LONG',
1456            'UNSUPPORTED_MEDIA_TYPE',
1457            'REQUESTED_RANGE_NOT_SATISFIABLE',
1458            'EXPECTATION_FAILED',
1459            'IM_A_TEAPOT',
1460            'MISDIRECTED_REQUEST',
1461            'UNPROCESSABLE_ENTITY',
1462            'LOCKED',
1463            'FAILED_DEPENDENCY',
1464            'UPGRADE_REQUIRED',
1465            'PRECONDITION_REQUIRED',
1466            'TOO_MANY_REQUESTS',
1467            'REQUEST_HEADER_FIELDS_TOO_LARGE',
1468            'UNAVAILABLE_FOR_LEGAL_REASONS',
1469            'INTERNAL_SERVER_ERROR',
1470            'NOT_IMPLEMENTED',
1471            'BAD_GATEWAY',
1472            'SERVICE_UNAVAILABLE',
1473            'GATEWAY_TIMEOUT',
1474            'HTTP_VERSION_NOT_SUPPORTED',
1475            'INSUFFICIENT_STORAGE',
1476            'NOT_EXTENDED',
1477            'NETWORK_AUTHENTICATION_REQUIRED',
1478            'EARLY_HINTS',
1479            'TOO_EARLY'
1480        ]
1481        for const in expected:
1482            with self.subTest(constant=const):
1483                self.assertTrue(hasattr(client, const))
1484
1485
1486class SourceAddressTest(TestCase):
1487    def setUp(self):
1488        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1489        self.port = socket_helper.bind_port(self.serv)
1490        self.source_port = socket_helper.find_unused_port()
1491        self.serv.listen()
1492        self.conn = None
1493
1494    def tearDown(self):
1495        if self.conn:
1496            self.conn.close()
1497            self.conn = None
1498        self.serv.close()
1499        self.serv = None
1500
1501    def testHTTPConnectionSourceAddress(self):
1502        self.conn = client.HTTPConnection(HOST, self.port,
1503                source_address=('', self.source_port))
1504        self.conn.connect()
1505        self.assertEqual(self.conn.sock.getsockname()[1], self.source_port)
1506
1507    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1508                     'http.client.HTTPSConnection not defined')
1509    def testHTTPSConnectionSourceAddress(self):
1510        self.conn = client.HTTPSConnection(HOST, self.port,
1511                source_address=('', self.source_port))
1512        # We don't test anything here other than the constructor not barfing as
1513        # this code doesn't deal with setting up an active running SSL server
1514        # for an ssl_wrapped connect() to actually return from.
1515
1516
1517class TimeoutTest(TestCase):
1518    PORT = None
1519
1520    def setUp(self):
1521        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1522        TimeoutTest.PORT = socket_helper.bind_port(self.serv)
1523        self.serv.listen()
1524
1525    def tearDown(self):
1526        self.serv.close()
1527        self.serv = None
1528
1529    def testTimeoutAttribute(self):
1530        # This will prove that the timeout gets through HTTPConnection
1531        # and into the socket.
1532
1533        # default -- use global socket timeout
1534        self.assertIsNone(socket.getdefaulttimeout())
1535        socket.setdefaulttimeout(30)
1536        try:
1537            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
1538            httpConn.connect()
1539        finally:
1540            socket.setdefaulttimeout(None)
1541        self.assertEqual(httpConn.sock.gettimeout(), 30)
1542        httpConn.close()
1543
1544        # no timeout -- do not use global socket default
1545        self.assertIsNone(socket.getdefaulttimeout())
1546        socket.setdefaulttimeout(30)
1547        try:
1548            httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
1549                                              timeout=None)
1550            httpConn.connect()
1551        finally:
1552            socket.setdefaulttimeout(None)
1553        self.assertEqual(httpConn.sock.gettimeout(), None)
1554        httpConn.close()
1555
1556        # a value
1557        httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
1558        httpConn.connect()
1559        self.assertEqual(httpConn.sock.gettimeout(), 30)
1560        httpConn.close()
1561
1562
1563class PersistenceTest(TestCase):
1564
1565    def test_reuse_reconnect(self):
1566        # Should reuse or reconnect depending on header from server
1567        tests = (
1568            ('1.0', '', False),
1569            ('1.0', 'Connection: keep-alive\r\n', True),
1570            ('1.1', '', True),
1571            ('1.1', 'Connection: close\r\n', False),
1572            ('1.0', 'Connection: keep-ALIVE\r\n', True),
1573            ('1.1', 'Connection: cloSE\r\n', False),
1574        )
1575        for version, header, reuse in tests:
1576            with self.subTest(version=version, header=header):
1577                msg = (
1578                    'HTTP/{} 200 OK\r\n'
1579                    '{}'
1580                    'Content-Length: 12\r\n'
1581                    '\r\n'
1582                    'Dummy body\r\n'
1583                ).format(version, header)
1584                conn = FakeSocketHTTPConnection(msg)
1585                self.assertIsNone(conn.sock)
1586                conn.request('GET', '/open-connection')
1587                with conn.getresponse() as response:
1588                    self.assertEqual(conn.sock is None, not reuse)
1589                    response.read()
1590                self.assertEqual(conn.sock is None, not reuse)
1591                self.assertEqual(conn.connections, 1)
1592                conn.request('GET', '/subsequent-request')
1593                self.assertEqual(conn.connections, 1 if reuse else 2)
1594
1595    def test_disconnected(self):
1596
1597        def make_reset_reader(text):
1598            """Return BufferedReader that raises ECONNRESET at EOF"""
1599            stream = io.BytesIO(text)
1600            def readinto(buffer):
1601                size = io.BytesIO.readinto(stream, buffer)
1602                if size == 0:
1603                    raise ConnectionResetError()
1604                return size
1605            stream.readinto = readinto
1606            return io.BufferedReader(stream)
1607
1608        tests = (
1609            (io.BytesIO, client.RemoteDisconnected),
1610            (make_reset_reader, ConnectionResetError),
1611        )
1612        for stream_factory, exception in tests:
1613            with self.subTest(exception=exception):
1614                conn = FakeSocketHTTPConnection(b'', stream_factory)
1615                conn.request('GET', '/eof-response')
1616                self.assertRaises(exception, conn.getresponse)
1617                self.assertIsNone(conn.sock)
1618                # HTTPConnection.connect() should be automatically invoked
1619                conn.request('GET', '/reconnect')
1620                self.assertEqual(conn.connections, 2)
1621
1622    def test_100_close(self):
1623        conn = FakeSocketHTTPConnection(
1624            b'HTTP/1.1 100 Continue\r\n'
1625            b'\r\n'
1626            # Missing final response
1627        )
1628        conn.request('GET', '/', headers={'Expect': '100-continue'})
1629        self.assertRaises(client.RemoteDisconnected, conn.getresponse)
1630        self.assertIsNone(conn.sock)
1631        conn.request('GET', '/reconnect')
1632        self.assertEqual(conn.connections, 2)
1633
1634
1635class HTTPSTest(TestCase):
1636
1637    def setUp(self):
1638        if not hasattr(client, 'HTTPSConnection'):
1639            self.skipTest('ssl support required')
1640
1641    def make_server(self, certfile):
1642        from test.ssl_servers import make_https_server
1643        return make_https_server(self, certfile=certfile)
1644
1645    def test_attributes(self):
1646        # simple test to check it's storing the timeout
1647        h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
1648        self.assertEqual(h.timeout, 30)
1649
1650    def test_networked(self):
1651        # Default settings: requires a valid cert from a trusted CA
1652        import ssl
1653        support.requires('network')
1654        with socket_helper.transient_internet('self-signed.pythontest.net'):
1655            h = client.HTTPSConnection('self-signed.pythontest.net', 443)
1656            with self.assertRaises(ssl.SSLError) as exc_info:
1657                h.request('GET', '/')
1658            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1659
1660    def test_networked_noverification(self):
1661        # Switch off cert verification
1662        import ssl
1663        support.requires('network')
1664        with socket_helper.transient_internet('self-signed.pythontest.net'):
1665            context = ssl._create_unverified_context()
1666            h = client.HTTPSConnection('self-signed.pythontest.net', 443,
1667                                       context=context)
1668            h.request('GET', '/')
1669            resp = h.getresponse()
1670            h.close()
1671            self.assertIn('nginx', resp.getheader('server'))
1672            resp.close()
1673
1674    @support.system_must_validate_cert
1675    def test_networked_trusted_by_default_cert(self):
1676        # Default settings: requires a valid cert from a trusted CA
1677        support.requires('network')
1678        with socket_helper.transient_internet('www.python.org'):
1679            h = client.HTTPSConnection('www.python.org', 443)
1680            h.request('GET', '/')
1681            resp = h.getresponse()
1682            content_type = resp.getheader('content-type')
1683            resp.close()
1684            h.close()
1685            self.assertIn('text/html', content_type)
1686
1687    def test_networked_good_cert(self):
1688        # We feed the server's cert as a validating cert
1689        import ssl
1690        support.requires('network')
1691        selfsigned_pythontestdotnet = 'self-signed.pythontest.net'
1692        with socket_helper.transient_internet(selfsigned_pythontestdotnet):
1693            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1694            self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED)
1695            self.assertEqual(context.check_hostname, True)
1696            context.load_verify_locations(CERT_selfsigned_pythontestdotnet)
1697            try:
1698                h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443,
1699                                           context=context)
1700                h.request('GET', '/')
1701                resp = h.getresponse()
1702            except ssl.SSLError as ssl_err:
1703                ssl_err_str = str(ssl_err)
1704                # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on
1705                # modern Linux distros (Debian Buster, etc) default OpenSSL
1706                # configurations it'll fail saying "key too weak" until we
1707                # address https://bugs.python.org/issue36816 to use a proper
1708                # key size on self-signed.pythontest.net.
1709                if re.search(r'(?i)key.too.weak', ssl_err_str):
1710                    raise unittest.SkipTest(
1711                        f'Got {ssl_err_str} trying to connect '
1712                        f'to {selfsigned_pythontestdotnet}. '
1713                        'See https://bugs.python.org/issue36816.')
1714                raise
1715            server_string = resp.getheader('server')
1716            resp.close()
1717            h.close()
1718            self.assertIn('nginx', server_string)
1719
1720    def test_networked_bad_cert(self):
1721        # We feed a "CA" cert that is unrelated to the server's cert
1722        import ssl
1723        support.requires('network')
1724        with socket_helper.transient_internet('self-signed.pythontest.net'):
1725            context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1726            context.load_verify_locations(CERT_localhost)
1727            h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context)
1728            with self.assertRaises(ssl.SSLError) as exc_info:
1729                h.request('GET', '/')
1730            self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1731
1732    def test_local_unknown_cert(self):
1733        # The custom cert isn't known to the default trust bundle
1734        import ssl
1735        server = self.make_server(CERT_localhost)
1736        h = client.HTTPSConnection('localhost', server.port)
1737        with self.assertRaises(ssl.SSLError) as exc_info:
1738            h.request('GET', '/')
1739        self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
1740
1741    def test_local_good_hostname(self):
1742        # The (valid) cert validates the HTTP hostname
1743        import ssl
1744        server = self.make_server(CERT_localhost)
1745        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1746        context.load_verify_locations(CERT_localhost)
1747        h = client.HTTPSConnection('localhost', server.port, context=context)
1748        self.addCleanup(h.close)
1749        h.request('GET', '/nonexistent')
1750        resp = h.getresponse()
1751        self.addCleanup(resp.close)
1752        self.assertEqual(resp.status, 404)
1753
1754    def test_local_bad_hostname(self):
1755        # The (valid) cert doesn't validate the HTTP hostname
1756        import ssl
1757        server = self.make_server(CERT_fakehostname)
1758        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1759        context.load_verify_locations(CERT_fakehostname)
1760        h = client.HTTPSConnection('localhost', server.port, context=context)
1761        with self.assertRaises(ssl.CertificateError):
1762            h.request('GET', '/')
1763        # Same with explicit check_hostname=True
1764        with support.check_warnings(('', DeprecationWarning)):
1765            h = client.HTTPSConnection('localhost', server.port,
1766                                       context=context, check_hostname=True)
1767        with self.assertRaises(ssl.CertificateError):
1768            h.request('GET', '/')
1769        # With check_hostname=False, the mismatching is ignored
1770        context.check_hostname = False
1771        with support.check_warnings(('', DeprecationWarning)):
1772            h = client.HTTPSConnection('localhost', server.port,
1773                                       context=context, check_hostname=False)
1774        h.request('GET', '/nonexistent')
1775        resp = h.getresponse()
1776        resp.close()
1777        h.close()
1778        self.assertEqual(resp.status, 404)
1779        # The context's check_hostname setting is used if one isn't passed to
1780        # HTTPSConnection.
1781        context.check_hostname = False
1782        h = client.HTTPSConnection('localhost', server.port, context=context)
1783        h.request('GET', '/nonexistent')
1784        resp = h.getresponse()
1785        self.assertEqual(resp.status, 404)
1786        resp.close()
1787        h.close()
1788        # Passing check_hostname to HTTPSConnection should override the
1789        # context's setting.
1790        with support.check_warnings(('', DeprecationWarning)):
1791            h = client.HTTPSConnection('localhost', server.port,
1792                                       context=context, check_hostname=True)
1793        with self.assertRaises(ssl.CertificateError):
1794            h.request('GET', '/')
1795
1796    @unittest.skipIf(not hasattr(client, 'HTTPSConnection'),
1797                     'http.client.HTTPSConnection not available')
1798    def test_host_port(self):
1799        # Check invalid host_port
1800
1801        for hp in ("www.python.org:abc", "user:password@www.python.org"):
1802            self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp)
1803
1804        for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
1805                          "fe80::207:e9ff:fe9b", 8000),
1806                         ("www.python.org:443", "www.python.org", 443),
1807                         ("www.python.org:", "www.python.org", 443),
1808                         ("www.python.org", "www.python.org", 443),
1809                         ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443),
1810                         ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b",
1811                             443)):
1812            c = client.HTTPSConnection(hp)
1813            self.assertEqual(h, c.host)
1814            self.assertEqual(p, c.port)
1815
1816    def test_tls13_pha(self):
1817        import ssl
1818        if not ssl.HAS_TLSv1_3:
1819            self.skipTest('TLS 1.3 support required')
1820        # just check status of PHA flag
1821        h = client.HTTPSConnection('localhost', 443)
1822        self.assertTrue(h._context.post_handshake_auth)
1823
1824        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1825        self.assertFalse(context.post_handshake_auth)
1826        h = client.HTTPSConnection('localhost', 443, context=context)
1827        self.assertIs(h._context, context)
1828        self.assertFalse(h._context.post_handshake_auth)
1829
1830        with warnings.catch_warnings():
1831            warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated',
1832                                    DeprecationWarning)
1833            h = client.HTTPSConnection('localhost', 443, context=context,
1834                                       cert_file=CERT_localhost)
1835        self.assertTrue(h._context.post_handshake_auth)
1836
1837
1838class RequestBodyTest(TestCase):
1839    """Test cases where a request includes a message body."""
1840
1841    def setUp(self):
1842        self.conn = client.HTTPConnection('example.com')
1843        self.conn.sock = self.sock = FakeSocket("")
1844        self.conn.sock = self.sock
1845
1846    def get_headers_and_fp(self):
1847        f = io.BytesIO(self.sock.data)
1848        f.readline()  # read the request line
1849        message = client.parse_headers(f)
1850        return message, f
1851
1852    def test_list_body(self):
1853        # Note that no content-length is automatically calculated for
1854        # an iterable.  The request will fall back to send chunked
1855        # transfer encoding.
1856        cases = (
1857            ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1858            ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'),
1859        )
1860        for body, expected in cases:
1861            with self.subTest(body):
1862                self.conn = client.HTTPConnection('example.com')
1863                self.conn.sock = self.sock = FakeSocket('')
1864
1865                self.conn.request('PUT', '/url', body)
1866                msg, f = self.get_headers_and_fp()
1867                self.assertNotIn('Content-Type', msg)
1868                self.assertNotIn('Content-Length', msg)
1869                self.assertEqual(msg.get('Transfer-Encoding'), 'chunked')
1870                self.assertEqual(expected, f.read())
1871
1872    def test_manual_content_length(self):
1873        # Set an incorrect content-length so that we can verify that
1874        # it will not be over-ridden by the library.
1875        self.conn.request("PUT", "/url", "body",
1876                          {"Content-Length": "42"})
1877        message, f = self.get_headers_and_fp()
1878        self.assertEqual("42", message.get("content-length"))
1879        self.assertEqual(4, len(f.read()))
1880
1881    def test_ascii_body(self):
1882        self.conn.request("PUT", "/url", "body")
1883        message, f = self.get_headers_and_fp()
1884        self.assertEqual("text/plain", message.get_content_type())
1885        self.assertIsNone(message.get_charset())
1886        self.assertEqual("4", message.get("content-length"))
1887        self.assertEqual(b'body', f.read())
1888
1889    def test_latin1_body(self):
1890        self.conn.request("PUT", "/url", "body\xc1")
1891        message, f = self.get_headers_and_fp()
1892        self.assertEqual("text/plain", message.get_content_type())
1893        self.assertIsNone(message.get_charset())
1894        self.assertEqual("5", message.get("content-length"))
1895        self.assertEqual(b'body\xc1', f.read())
1896
1897    def test_bytes_body(self):
1898        self.conn.request("PUT", "/url", b"body\xc1")
1899        message, f = self.get_headers_and_fp()
1900        self.assertEqual("text/plain", message.get_content_type())
1901        self.assertIsNone(message.get_charset())
1902        self.assertEqual("5", message.get("content-length"))
1903        self.assertEqual(b'body\xc1', f.read())
1904
1905    def test_text_file_body(self):
1906        self.addCleanup(support.unlink, support.TESTFN)
1907        with open(support.TESTFN, "w") as f:
1908            f.write("body")
1909        with open(support.TESTFN) as f:
1910            self.conn.request("PUT", "/url", f)
1911            message, f = self.get_headers_and_fp()
1912            self.assertEqual("text/plain", message.get_content_type())
1913            self.assertIsNone(message.get_charset())
1914            # No content-length will be determined for files; the body
1915            # will be sent using chunked transfer encoding instead.
1916            self.assertIsNone(message.get("content-length"))
1917            self.assertEqual("chunked", message.get("transfer-encoding"))
1918            self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read())
1919
1920    def test_binary_file_body(self):
1921        self.addCleanup(support.unlink, support.TESTFN)
1922        with open(support.TESTFN, "wb") as f:
1923            f.write(b"body\xc1")
1924        with open(support.TESTFN, "rb") as f:
1925            self.conn.request("PUT", "/url", f)
1926            message, f = self.get_headers_and_fp()
1927            self.assertEqual("text/plain", message.get_content_type())
1928            self.assertIsNone(message.get_charset())
1929            self.assertEqual("chunked", message.get("Transfer-Encoding"))
1930            self.assertNotIn("Content-Length", message)
1931            self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read())
1932
1933
1934class HTTPResponseTest(TestCase):
1935
1936    def setUp(self):
1937        body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \
1938                second-value\r\n\r\nText"
1939        sock = FakeSocket(body)
1940        self.resp = client.HTTPResponse(sock)
1941        self.resp.begin()
1942
1943    def test_getting_header(self):
1944        header = self.resp.getheader('My-Header')
1945        self.assertEqual(header, 'first-value, second-value')
1946
1947        header = self.resp.getheader('My-Header', 'some default')
1948        self.assertEqual(header, 'first-value, second-value')
1949
1950    def test_getting_nonexistent_header_with_string_default(self):
1951        header = self.resp.getheader('No-Such-Header', 'default-value')
1952        self.assertEqual(header, 'default-value')
1953
1954    def test_getting_nonexistent_header_with_iterable_default(self):
1955        header = self.resp.getheader('No-Such-Header', ['default', 'values'])
1956        self.assertEqual(header, 'default, values')
1957
1958        header = self.resp.getheader('No-Such-Header', ('default', 'values'))
1959        self.assertEqual(header, 'default, values')
1960
1961    def test_getting_nonexistent_header_without_default(self):
1962        header = self.resp.getheader('No-Such-Header')
1963        self.assertEqual(header, None)
1964
1965    def test_getting_header_defaultint(self):
1966        header = self.resp.getheader('No-Such-Header',default=42)
1967        self.assertEqual(header, 42)
1968
1969class TunnelTests(TestCase):
1970    def setUp(self):
1971        response_text = (
1972            'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
1973            'HTTP/1.1 200 OK\r\n' # Reply to HEAD
1974            'Content-Length: 42\r\n\r\n'
1975        )
1976        self.host = 'proxy.com'
1977        self.conn = client.HTTPConnection(self.host)
1978        self.conn._create_connection = self._create_connection(response_text)
1979
1980    def tearDown(self):
1981        self.conn.close()
1982
1983    def _create_connection(self, response_text):
1984        def create_connection(address, timeout=None, source_address=None):
1985            return FakeSocket(response_text, host=address[0], port=address[1])
1986        return create_connection
1987
1988    def test_set_tunnel_host_port_headers(self):
1989        tunnel_host = 'destination.com'
1990        tunnel_port = 8888
1991        tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'}
1992        self.conn.set_tunnel(tunnel_host, port=tunnel_port,
1993                             headers=tunnel_headers)
1994        self.conn.request('HEAD', '/', '')
1995        self.assertEqual(self.conn.sock.host, self.host)
1996        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
1997        self.assertEqual(self.conn._tunnel_host, tunnel_host)
1998        self.assertEqual(self.conn._tunnel_port, tunnel_port)
1999        self.assertEqual(self.conn._tunnel_headers, tunnel_headers)
2000
2001    def test_disallow_set_tunnel_after_connect(self):
2002        # Once connected, we shouldn't be able to tunnel anymore
2003        self.conn.connect()
2004        self.assertRaises(RuntimeError, self.conn.set_tunnel,
2005                          'destination.com')
2006
2007    def test_connect_with_tunnel(self):
2008        self.conn.set_tunnel('destination.com')
2009        self.conn.request('HEAD', '/', '')
2010        self.assertEqual(self.conn.sock.host, self.host)
2011        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2012        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2013        # issue22095
2014        self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data)
2015        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2016
2017        # This test should be removed when CONNECT gets the HTTP/1.1 blessing
2018        self.assertNotIn(b'Host: proxy.com', self.conn.sock.data)
2019
2020    def test_connect_put_request(self):
2021        self.conn.set_tunnel('destination.com')
2022        self.conn.request('PUT', '/', '')
2023        self.assertEqual(self.conn.sock.host, self.host)
2024        self.assertEqual(self.conn.sock.port, client.HTTP_PORT)
2025        self.assertIn(b'CONNECT destination.com', self.conn.sock.data)
2026        self.assertIn(b'Host: destination.com', self.conn.sock.data)
2027
2028    def test_tunnel_debuglog(self):
2029        expected_header = 'X-Dummy: 1'
2030        response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header)
2031
2032        self.conn.set_debuglevel(1)
2033        self.conn._create_connection = self._create_connection(response_text)
2034        self.conn.set_tunnel('destination.com')
2035
2036        with support.captured_stdout() as output:
2037            self.conn.request('PUT', '/', '')
2038        lines = output.getvalue().splitlines()
2039        self.assertIn('header: {}'.format(expected_header), lines)
2040
2041
2042if __name__ == '__main__':
2043    unittest.main(verbosity=2)
2044