1import errno 2from http import client 3import io 4import itertools 5import os 6import array 7import re 8import socket 9import threading 10import warnings 11 12import unittest 13TestCase = unittest.TestCase 14 15from test import support 16from test.support import socket_helper 17 18here = os.path.dirname(__file__) 19# Self-signed cert file for 'localhost' 20CERT_localhost = os.path.join(here, 'keycert.pem') 21# Self-signed cert file for 'fakehostname' 22CERT_fakehostname = os.path.join(here, 'keycert2.pem') 23# Self-signed cert file for self-signed.pythontest.net 24CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotnet.pem') 25 26# constants for testing chunked encoding 27chunked_start = ( 28 'HTTP/1.1 200 OK\r\n' 29 'Transfer-Encoding: chunked\r\n\r\n' 30 'a\r\n' 31 'hello worl\r\n' 32 '3\r\n' 33 'd! \r\n' 34 '8\r\n' 35 'and now \r\n' 36 '22\r\n' 37 'for something completely different\r\n' 38) 39chunked_expected = b'hello world! and now for something completely different' 40chunk_extension = ";foo=bar" 41last_chunk = "0\r\n" 42last_chunk_extended = "0" + chunk_extension + "\r\n" 43trailers = "X-Dummy: foo\r\nX-Dumm2: bar\r\n" 44chunked_end = "\r\n" 45 46HOST = socket_helper.HOST 47 48class FakeSocket: 49 def __init__(self, text, fileclass=io.BytesIO, host=None, port=None): 50 if isinstance(text, str): 51 text = text.encode("ascii") 52 self.text = text 53 self.fileclass = fileclass 54 self.data = b'' 55 self.sendall_calls = 0 56 self.file_closed = False 57 self.host = host 58 self.port = port 59 60 def sendall(self, data): 61 self.sendall_calls += 1 62 self.data += data 63 64 def makefile(self, mode, bufsize=None): 65 if mode != 'r' and mode != 'rb': 66 raise client.UnimplementedFileMode() 67 # keep the file around so we can check how much was read from it 68 self.file = self.fileclass(self.text) 69 self.file.close = self.file_close #nerf close () 70 return self.file 71 72 def file_close(self): 73 self.file_closed = True 74 75 def close(self): 76 pass 77 78 def setsockopt(self, level, optname, value): 79 pass 80 81class EPipeSocket(FakeSocket): 82 83 def __init__(self, text, pipe_trigger): 84 # When sendall() is called with pipe_trigger, raise EPIPE. 85 FakeSocket.__init__(self, text) 86 self.pipe_trigger = pipe_trigger 87 88 def sendall(self, data): 89 if self.pipe_trigger in data: 90 raise OSError(errno.EPIPE, "gotcha") 91 self.data += data 92 93 def close(self): 94 pass 95 96class NoEOFBytesIO(io.BytesIO): 97 """Like BytesIO, but raises AssertionError on EOF. 98 99 This is used below to test that http.client doesn't try to read 100 more from the underlying file than it should. 101 """ 102 def read(self, n=-1): 103 data = io.BytesIO.read(self, n) 104 if data == b'': 105 raise AssertionError('caller tried to read past EOF') 106 return data 107 108 def readline(self, length=None): 109 data = io.BytesIO.readline(self, length) 110 if data == b'': 111 raise AssertionError('caller tried to read past EOF') 112 return data 113 114class FakeSocketHTTPConnection(client.HTTPConnection): 115 """HTTPConnection subclass using FakeSocket; counts connect() calls""" 116 117 def __init__(self, *args): 118 self.connections = 0 119 super().__init__('example.com') 120 self.fake_socket_args = args 121 self._create_connection = self.create_connection 122 123 def connect(self): 124 """Count the number of times connect() is invoked""" 125 self.connections += 1 126 return super().connect() 127 128 def create_connection(self, *pos, **kw): 129 return FakeSocket(*self.fake_socket_args) 130 131class HeaderTests(TestCase): 132 def test_auto_headers(self): 133 # Some headers are added automatically, but should not be added by 134 # .request() if they are explicitly set. 135 136 class HeaderCountingBuffer(list): 137 def __init__(self): 138 self.count = {} 139 def append(self, item): 140 kv = item.split(b':') 141 if len(kv) > 1: 142 # item is a 'Key: Value' header string 143 lcKey = kv[0].decode('ascii').lower() 144 self.count.setdefault(lcKey, 0) 145 self.count[lcKey] += 1 146 list.append(self, item) 147 148 for explicit_header in True, False: 149 for header in 'Content-length', 'Host', 'Accept-encoding': 150 conn = client.HTTPConnection('example.com') 151 conn.sock = FakeSocket('blahblahblah') 152 conn._buffer = HeaderCountingBuffer() 153 154 body = 'spamspamspam' 155 headers = {} 156 if explicit_header: 157 headers[header] = str(len(body)) 158 conn.request('POST', '/', body, headers) 159 self.assertEqual(conn._buffer.count[header.lower()], 1) 160 161 def test_content_length_0(self): 162 163 class ContentLengthChecker(list): 164 def __init__(self): 165 list.__init__(self) 166 self.content_length = None 167 def append(self, item): 168 kv = item.split(b':', 1) 169 if len(kv) > 1 and kv[0].lower() == b'content-length': 170 self.content_length = kv[1].strip() 171 list.append(self, item) 172 173 # Here, we're testing that methods expecting a body get a 174 # content-length set to zero if the body is empty (either None or '') 175 bodies = (None, '') 176 methods_with_body = ('PUT', 'POST', 'PATCH') 177 for method, body in itertools.product(methods_with_body, bodies): 178 conn = client.HTTPConnection('example.com') 179 conn.sock = FakeSocket(None) 180 conn._buffer = ContentLengthChecker() 181 conn.request(method, '/', body) 182 self.assertEqual( 183 conn._buffer.content_length, b'0', 184 'Header Content-Length incorrect on {}'.format(method) 185 ) 186 187 # For these methods, we make sure that content-length is not set when 188 # the body is None because it might cause unexpected behaviour on the 189 # server. 190 methods_without_body = ( 191 'GET', 'CONNECT', 'DELETE', 'HEAD', 'OPTIONS', 'TRACE', 192 ) 193 for method in methods_without_body: 194 conn = client.HTTPConnection('example.com') 195 conn.sock = FakeSocket(None) 196 conn._buffer = ContentLengthChecker() 197 conn.request(method, '/', None) 198 self.assertEqual( 199 conn._buffer.content_length, None, 200 'Header Content-Length set for empty body on {}'.format(method) 201 ) 202 203 # If the body is set to '', that's considered to be "present but 204 # empty" rather than "missing", so content length would be set, even 205 # for methods that don't expect a body. 206 for method in methods_without_body: 207 conn = client.HTTPConnection('example.com') 208 conn.sock = FakeSocket(None) 209 conn._buffer = ContentLengthChecker() 210 conn.request(method, '/', '') 211 self.assertEqual( 212 conn._buffer.content_length, b'0', 213 'Header Content-Length incorrect on {}'.format(method) 214 ) 215 216 # If the body is set, make sure Content-Length is set. 217 for method in itertools.chain(methods_without_body, methods_with_body): 218 conn = client.HTTPConnection('example.com') 219 conn.sock = FakeSocket(None) 220 conn._buffer = ContentLengthChecker() 221 conn.request(method, '/', ' ') 222 self.assertEqual( 223 conn._buffer.content_length, b'1', 224 'Header Content-Length incorrect on {}'.format(method) 225 ) 226 227 def test_putheader(self): 228 conn = client.HTTPConnection('example.com') 229 conn.sock = FakeSocket(None) 230 conn.putrequest('GET','/') 231 conn.putheader('Content-length', 42) 232 self.assertIn(b'Content-length: 42', conn._buffer) 233 234 conn.putheader('Foo', ' bar ') 235 self.assertIn(b'Foo: bar ', conn._buffer) 236 conn.putheader('Bar', '\tbaz\t') 237 self.assertIn(b'Bar: \tbaz\t', conn._buffer) 238 conn.putheader('Authorization', 'Bearer mytoken') 239 self.assertIn(b'Authorization: Bearer mytoken', conn._buffer) 240 conn.putheader('IterHeader', 'IterA', 'IterB') 241 self.assertIn(b'IterHeader: IterA\r\n\tIterB', conn._buffer) 242 conn.putheader('LatinHeader', b'\xFF') 243 self.assertIn(b'LatinHeader: \xFF', conn._buffer) 244 conn.putheader('Utf8Header', b'\xc3\x80') 245 self.assertIn(b'Utf8Header: \xc3\x80', conn._buffer) 246 conn.putheader('C1-Control', b'next\x85line') 247 self.assertIn(b'C1-Control: next\x85line', conn._buffer) 248 conn.putheader('Embedded-Fold-Space', 'is\r\n allowed') 249 self.assertIn(b'Embedded-Fold-Space: is\r\n allowed', conn._buffer) 250 conn.putheader('Embedded-Fold-Tab', 'is\r\n\tallowed') 251 self.assertIn(b'Embedded-Fold-Tab: is\r\n\tallowed', conn._buffer) 252 conn.putheader('Key Space', 'value') 253 self.assertIn(b'Key Space: value', conn._buffer) 254 conn.putheader('KeySpace ', 'value') 255 self.assertIn(b'KeySpace : value', conn._buffer) 256 conn.putheader(b'Nonbreak\xa0Space', 'value') 257 self.assertIn(b'Nonbreak\xa0Space: value', conn._buffer) 258 conn.putheader(b'\xa0NonbreakSpace', 'value') 259 self.assertIn(b'\xa0NonbreakSpace: value', conn._buffer) 260 261 def test_ipv6host_header(self): 262 # Default host header on IPv6 transaction should be wrapped by [] if 263 # it is an IPv6 address 264 expected = b'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \ 265 b'Accept-Encoding: identity\r\n\r\n' 266 conn = client.HTTPConnection('[2001::]:81') 267 sock = FakeSocket('') 268 conn.sock = sock 269 conn.request('GET', '/foo') 270 self.assertTrue(sock.data.startswith(expected)) 271 272 expected = b'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \ 273 b'Accept-Encoding: identity\r\n\r\n' 274 conn = client.HTTPConnection('[2001:102A::]') 275 sock = FakeSocket('') 276 conn.sock = sock 277 conn.request('GET', '/foo') 278 self.assertTrue(sock.data.startswith(expected)) 279 280 def test_malformed_headers_coped_with(self): 281 # Issue 19996 282 body = "HTTP/1.1 200 OK\r\nFirst: val\r\n: nval\r\nSecond: val\r\n\r\n" 283 sock = FakeSocket(body) 284 resp = client.HTTPResponse(sock) 285 resp.begin() 286 287 self.assertEqual(resp.getheader('First'), 'val') 288 self.assertEqual(resp.getheader('Second'), 'val') 289 290 def test_parse_all_octets(self): 291 # Ensure no valid header field octet breaks the parser 292 body = ( 293 b'HTTP/1.1 200 OK\r\n' 294 b"!#$%&'*+-.^_`|~: value\r\n" # Special token characters 295 b'VCHAR: ' + bytes(range(0x21, 0x7E + 1)) + b'\r\n' 296 b'obs-text: ' + bytes(range(0x80, 0xFF + 1)) + b'\r\n' 297 b'obs-fold: text\r\n' 298 b' folded with space\r\n' 299 b'\tfolded with tab\r\n' 300 b'Content-Length: 0\r\n' 301 b'\r\n' 302 ) 303 sock = FakeSocket(body) 304 resp = client.HTTPResponse(sock) 305 resp.begin() 306 self.assertEqual(resp.getheader('Content-Length'), '0') 307 self.assertEqual(resp.msg['Content-Length'], '0') 308 self.assertEqual(resp.getheader("!#$%&'*+-.^_`|~"), 'value') 309 self.assertEqual(resp.msg["!#$%&'*+-.^_`|~"], 'value') 310 vchar = ''.join(map(chr, range(0x21, 0x7E + 1))) 311 self.assertEqual(resp.getheader('VCHAR'), vchar) 312 self.assertEqual(resp.msg['VCHAR'], vchar) 313 self.assertIsNotNone(resp.getheader('obs-text')) 314 self.assertIn('obs-text', resp.msg) 315 for folded in (resp.getheader('obs-fold'), resp.msg['obs-fold']): 316 self.assertTrue(folded.startswith('text')) 317 self.assertIn(' folded with space', folded) 318 self.assertTrue(folded.endswith('folded with tab')) 319 320 def test_invalid_headers(self): 321 conn = client.HTTPConnection('example.com') 322 conn.sock = FakeSocket('') 323 conn.putrequest('GET', '/') 324 325 # http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no 326 # longer allowed in header names 327 cases = ( 328 (b'Invalid\r\nName', b'ValidValue'), 329 (b'Invalid\rName', b'ValidValue'), 330 (b'Invalid\nName', b'ValidValue'), 331 (b'\r\nInvalidName', b'ValidValue'), 332 (b'\rInvalidName', b'ValidValue'), 333 (b'\nInvalidName', b'ValidValue'), 334 (b' InvalidName', b'ValidValue'), 335 (b'\tInvalidName', b'ValidValue'), 336 (b'Invalid:Name', b'ValidValue'), 337 (b':InvalidName', b'ValidValue'), 338 (b'ValidName', b'Invalid\r\nValue'), 339 (b'ValidName', b'Invalid\rValue'), 340 (b'ValidName', b'Invalid\nValue'), 341 (b'ValidName', b'InvalidValue\r\n'), 342 (b'ValidName', b'InvalidValue\r'), 343 (b'ValidName', b'InvalidValue\n'), 344 ) 345 for name, value in cases: 346 with self.subTest((name, value)): 347 with self.assertRaisesRegex(ValueError, 'Invalid header'): 348 conn.putheader(name, value) 349 350 def test_headers_debuglevel(self): 351 body = ( 352 b'HTTP/1.1 200 OK\r\n' 353 b'First: val\r\n' 354 b'Second: val1\r\n' 355 b'Second: val2\r\n' 356 ) 357 sock = FakeSocket(body) 358 resp = client.HTTPResponse(sock, debuglevel=1) 359 with support.captured_stdout() as output: 360 resp.begin() 361 lines = output.getvalue().splitlines() 362 self.assertEqual(lines[0], "reply: 'HTTP/1.1 200 OK\\r\\n'") 363 self.assertEqual(lines[1], "header: First: val") 364 self.assertEqual(lines[2], "header: Second: val1") 365 self.assertEqual(lines[3], "header: Second: val2") 366 367 368class HttpMethodTests(TestCase): 369 def test_invalid_method_names(self): 370 methods = ( 371 'GET\r', 372 'POST\n', 373 'PUT\n\r', 374 'POST\nValue', 375 'POST\nHOST:abc', 376 'GET\nrHost:abc\n', 377 'POST\rRemainder:\r', 378 'GET\rHOST:\n', 379 '\nPUT' 380 ) 381 382 for method in methods: 383 with self.assertRaisesRegex( 384 ValueError, "method can't contain control characters"): 385 conn = client.HTTPConnection('example.com') 386 conn.sock = FakeSocket(None) 387 conn.request(method=method, url="/") 388 389 390class TransferEncodingTest(TestCase): 391 expected_body = b"It's just a flesh wound" 392 393 def test_endheaders_chunked(self): 394 conn = client.HTTPConnection('example.com') 395 conn.sock = FakeSocket(b'') 396 conn.putrequest('POST', '/') 397 conn.endheaders(self._make_body(), encode_chunked=True) 398 399 _, _, body = self._parse_request(conn.sock.data) 400 body = self._parse_chunked(body) 401 self.assertEqual(body, self.expected_body) 402 403 def test_explicit_headers(self): 404 # explicit chunked 405 conn = client.HTTPConnection('example.com') 406 conn.sock = FakeSocket(b'') 407 # this shouldn't actually be automatically chunk-encoded because the 408 # calling code has explicitly stated that it's taking care of it 409 conn.request( 410 'POST', '/', self._make_body(), {'Transfer-Encoding': 'chunked'}) 411 412 _, headers, body = self._parse_request(conn.sock.data) 413 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 414 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 415 self.assertEqual(body, self.expected_body) 416 417 # explicit chunked, string body 418 conn = client.HTTPConnection('example.com') 419 conn.sock = FakeSocket(b'') 420 conn.request( 421 'POST', '/', self.expected_body.decode('latin-1'), 422 {'Transfer-Encoding': 'chunked'}) 423 424 _, headers, body = self._parse_request(conn.sock.data) 425 self.assertNotIn('content-length', [k.lower() for k in headers.keys()]) 426 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 427 self.assertEqual(body, self.expected_body) 428 429 # User-specified TE, but request() does the chunk encoding 430 conn = client.HTTPConnection('example.com') 431 conn.sock = FakeSocket(b'') 432 conn.request('POST', '/', 433 headers={'Transfer-Encoding': 'gzip, chunked'}, 434 encode_chunked=True, 435 body=self._make_body()) 436 _, headers, body = self._parse_request(conn.sock.data) 437 self.assertNotIn('content-length', [k.lower() for k in headers]) 438 self.assertEqual(headers['Transfer-Encoding'], 'gzip, chunked') 439 self.assertEqual(self._parse_chunked(body), self.expected_body) 440 441 def test_request(self): 442 for empty_lines in (False, True,): 443 conn = client.HTTPConnection('example.com') 444 conn.sock = FakeSocket(b'') 445 conn.request( 446 'POST', '/', self._make_body(empty_lines=empty_lines)) 447 448 _, headers, body = self._parse_request(conn.sock.data) 449 body = self._parse_chunked(body) 450 self.assertEqual(body, self.expected_body) 451 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 452 453 # Content-Length and Transfer-Encoding SHOULD not be sent in the 454 # same request 455 self.assertNotIn('content-length', [k.lower() for k in headers]) 456 457 def test_empty_body(self): 458 # Zero-length iterable should be treated like any other iterable 459 conn = client.HTTPConnection('example.com') 460 conn.sock = FakeSocket(b'') 461 conn.request('POST', '/', ()) 462 _, headers, body = self._parse_request(conn.sock.data) 463 self.assertEqual(headers['Transfer-Encoding'], 'chunked') 464 self.assertNotIn('content-length', [k.lower() for k in headers]) 465 self.assertEqual(body, b"0\r\n\r\n") 466 467 def _make_body(self, empty_lines=False): 468 lines = self.expected_body.split(b' ') 469 for idx, line in enumerate(lines): 470 # for testing handling empty lines 471 if empty_lines and idx % 2: 472 yield b'' 473 if idx < len(lines) - 1: 474 yield line + b' ' 475 else: 476 yield line 477 478 def _parse_request(self, data): 479 lines = data.split(b'\r\n') 480 request = lines[0] 481 headers = {} 482 n = 1 483 while n < len(lines) and len(lines[n]) > 0: 484 key, val = lines[n].split(b':') 485 key = key.decode('latin-1').strip() 486 headers[key] = val.decode('latin-1').strip() 487 n += 1 488 489 return request, headers, b'\r\n'.join(lines[n + 1:]) 490 491 def _parse_chunked(self, data): 492 body = [] 493 trailers = {} 494 n = 0 495 lines = data.split(b'\r\n') 496 # parse body 497 while True: 498 size, chunk = lines[n:n+2] 499 size = int(size, 16) 500 501 if size == 0: 502 n += 1 503 break 504 505 self.assertEqual(size, len(chunk)) 506 body.append(chunk) 507 508 n += 2 509 # we /should/ hit the end chunk, but check against the size of 510 # lines so we're not stuck in an infinite loop should we get 511 # malformed data 512 if n > len(lines): 513 break 514 515 return b''.join(body) 516 517 518class BasicTest(TestCase): 519 def test_status_lines(self): 520 # Test HTTP status lines 521 522 body = "HTTP/1.1 200 Ok\r\n\r\nText" 523 sock = FakeSocket(body) 524 resp = client.HTTPResponse(sock) 525 resp.begin() 526 self.assertEqual(resp.read(0), b'') # Issue #20007 527 self.assertFalse(resp.isclosed()) 528 self.assertFalse(resp.closed) 529 self.assertEqual(resp.read(), b"Text") 530 self.assertTrue(resp.isclosed()) 531 self.assertFalse(resp.closed) 532 resp.close() 533 self.assertTrue(resp.closed) 534 535 body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" 536 sock = FakeSocket(body) 537 resp = client.HTTPResponse(sock) 538 self.assertRaises(client.BadStatusLine, resp.begin) 539 540 def test_bad_status_repr(self): 541 exc = client.BadStatusLine('') 542 self.assertEqual(repr(exc), '''BadStatusLine("''")''') 543 544 def test_partial_reads(self): 545 # if we have Content-Length, HTTPResponse knows when to close itself, 546 # the same behaviour as when we read the whole thing with read() 547 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 548 sock = FakeSocket(body) 549 resp = client.HTTPResponse(sock) 550 resp.begin() 551 self.assertEqual(resp.read(2), b'Te') 552 self.assertFalse(resp.isclosed()) 553 self.assertEqual(resp.read(2), b'xt') 554 self.assertTrue(resp.isclosed()) 555 self.assertFalse(resp.closed) 556 resp.close() 557 self.assertTrue(resp.closed) 558 559 def test_mixed_reads(self): 560 # readline() should update the remaining length, so that read() knows 561 # how much data is left and does not raise IncompleteRead 562 body = "HTTP/1.1 200 Ok\r\nContent-Length: 13\r\n\r\nText\r\nAnother" 563 sock = FakeSocket(body) 564 resp = client.HTTPResponse(sock) 565 resp.begin() 566 self.assertEqual(resp.readline(), b'Text\r\n') 567 self.assertFalse(resp.isclosed()) 568 self.assertEqual(resp.read(), b'Another') 569 self.assertTrue(resp.isclosed()) 570 self.assertFalse(resp.closed) 571 resp.close() 572 self.assertTrue(resp.closed) 573 574 def test_partial_readintos(self): 575 # if we have Content-Length, HTTPResponse knows when to close itself, 576 # the same behaviour as when we read the whole thing with read() 577 body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText" 578 sock = FakeSocket(body) 579 resp = client.HTTPResponse(sock) 580 resp.begin() 581 b = bytearray(2) 582 n = resp.readinto(b) 583 self.assertEqual(n, 2) 584 self.assertEqual(bytes(b), b'Te') 585 self.assertFalse(resp.isclosed()) 586 n = resp.readinto(b) 587 self.assertEqual(n, 2) 588 self.assertEqual(bytes(b), b'xt') 589 self.assertTrue(resp.isclosed()) 590 self.assertFalse(resp.closed) 591 resp.close() 592 self.assertTrue(resp.closed) 593 594 def test_partial_reads_no_content_length(self): 595 # when no length is present, the socket should be gracefully closed when 596 # all data was read 597 body = "HTTP/1.1 200 Ok\r\n\r\nText" 598 sock = FakeSocket(body) 599 resp = client.HTTPResponse(sock) 600 resp.begin() 601 self.assertEqual(resp.read(2), b'Te') 602 self.assertFalse(resp.isclosed()) 603 self.assertEqual(resp.read(2), b'xt') 604 self.assertEqual(resp.read(1), b'') 605 self.assertTrue(resp.isclosed()) 606 self.assertFalse(resp.closed) 607 resp.close() 608 self.assertTrue(resp.closed) 609 610 def test_partial_readintos_no_content_length(self): 611 # when no length is present, the socket should be gracefully closed when 612 # all data was read 613 body = "HTTP/1.1 200 Ok\r\n\r\nText" 614 sock = FakeSocket(body) 615 resp = client.HTTPResponse(sock) 616 resp.begin() 617 b = bytearray(2) 618 n = resp.readinto(b) 619 self.assertEqual(n, 2) 620 self.assertEqual(bytes(b), b'Te') 621 self.assertFalse(resp.isclosed()) 622 n = resp.readinto(b) 623 self.assertEqual(n, 2) 624 self.assertEqual(bytes(b), b'xt') 625 n = resp.readinto(b) 626 self.assertEqual(n, 0) 627 self.assertTrue(resp.isclosed()) 628 629 def test_partial_reads_incomplete_body(self): 630 # if the server shuts down the connection before the whole 631 # content-length is delivered, the socket is gracefully closed 632 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 633 sock = FakeSocket(body) 634 resp = client.HTTPResponse(sock) 635 resp.begin() 636 self.assertEqual(resp.read(2), b'Te') 637 self.assertFalse(resp.isclosed()) 638 self.assertEqual(resp.read(2), b'xt') 639 self.assertEqual(resp.read(1), b'') 640 self.assertTrue(resp.isclosed()) 641 642 def test_partial_readintos_incomplete_body(self): 643 # if the server shuts down the connection before the whole 644 # content-length is delivered, the socket is gracefully closed 645 body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText" 646 sock = FakeSocket(body) 647 resp = client.HTTPResponse(sock) 648 resp.begin() 649 b = bytearray(2) 650 n = resp.readinto(b) 651 self.assertEqual(n, 2) 652 self.assertEqual(bytes(b), b'Te') 653 self.assertFalse(resp.isclosed()) 654 n = resp.readinto(b) 655 self.assertEqual(n, 2) 656 self.assertEqual(bytes(b), b'xt') 657 n = resp.readinto(b) 658 self.assertEqual(n, 0) 659 self.assertTrue(resp.isclosed()) 660 self.assertFalse(resp.closed) 661 resp.close() 662 self.assertTrue(resp.closed) 663 664 def test_host_port(self): 665 # Check invalid host_port 666 667 for hp in ("www.python.org:abc", "user:password@www.python.org"): 668 self.assertRaises(client.InvalidURL, client.HTTPConnection, hp) 669 670 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 671 "fe80::207:e9ff:fe9b", 8000), 672 ("www.python.org:80", "www.python.org", 80), 673 ("www.python.org:", "www.python.org", 80), 674 ("www.python.org", "www.python.org", 80), 675 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80), 676 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 80)): 677 c = client.HTTPConnection(hp) 678 self.assertEqual(h, c.host) 679 self.assertEqual(p, c.port) 680 681 def test_response_headers(self): 682 # test response with multiple message headers with the same field name. 683 text = ('HTTP/1.1 200 OK\r\n' 684 'Set-Cookie: Customer="WILE_E_COYOTE"; ' 685 'Version="1"; Path="/acme"\r\n' 686 'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' 687 ' Path="/acme"\r\n' 688 '\r\n' 689 'No body\r\n') 690 hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' 691 ', ' 692 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') 693 s = FakeSocket(text) 694 r = client.HTTPResponse(s) 695 r.begin() 696 cookies = r.getheader("Set-Cookie") 697 self.assertEqual(cookies, hdr) 698 699 def test_read_head(self): 700 # Test that the library doesn't attempt to read any data 701 # from a HEAD request. (Tickles SF bug #622042.) 702 sock = FakeSocket( 703 'HTTP/1.1 200 OK\r\n' 704 'Content-Length: 14432\r\n' 705 '\r\n', 706 NoEOFBytesIO) 707 resp = client.HTTPResponse(sock, method="HEAD") 708 resp.begin() 709 if resp.read(): 710 self.fail("Did not expect response from HEAD request") 711 712 def test_readinto_head(self): 713 # Test that the library doesn't attempt to read any data 714 # from a HEAD request. (Tickles SF bug #622042.) 715 sock = FakeSocket( 716 'HTTP/1.1 200 OK\r\n' 717 'Content-Length: 14432\r\n' 718 '\r\n', 719 NoEOFBytesIO) 720 resp = client.HTTPResponse(sock, method="HEAD") 721 resp.begin() 722 b = bytearray(5) 723 if resp.readinto(b) != 0: 724 self.fail("Did not expect response from HEAD request") 725 self.assertEqual(bytes(b), b'\x00'*5) 726 727 def test_too_many_headers(self): 728 headers = '\r\n'.join('Header%d: foo' % i 729 for i in range(client._MAXHEADERS + 1)) + '\r\n' 730 text = ('HTTP/1.1 200 OK\r\n' + headers) 731 s = FakeSocket(text) 732 r = client.HTTPResponse(s) 733 self.assertRaisesRegex(client.HTTPException, 734 r"got more than \d+ headers", r.begin) 735 736 def test_send_file(self): 737 expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' 738 b'Accept-Encoding: identity\r\n' 739 b'Transfer-Encoding: chunked\r\n' 740 b'\r\n') 741 742 with open(__file__, 'rb') as body: 743 conn = client.HTTPConnection('example.com') 744 sock = FakeSocket(body) 745 conn.sock = sock 746 conn.request('GET', '/foo', body) 747 self.assertTrue(sock.data.startswith(expected), '%r != %r' % 748 (sock.data[:len(expected)], expected)) 749 750 def test_send(self): 751 expected = b'this is a test this is only a test' 752 conn = client.HTTPConnection('example.com') 753 sock = FakeSocket(None) 754 conn.sock = sock 755 conn.send(expected) 756 self.assertEqual(expected, sock.data) 757 sock.data = b'' 758 conn.send(array.array('b', expected)) 759 self.assertEqual(expected, sock.data) 760 sock.data = b'' 761 conn.send(io.BytesIO(expected)) 762 self.assertEqual(expected, sock.data) 763 764 def test_send_updating_file(self): 765 def data(): 766 yield 'data' 767 yield None 768 yield 'data_two' 769 770 class UpdatingFile(io.TextIOBase): 771 mode = 'r' 772 d = data() 773 def read(self, blocksize=-1): 774 return next(self.d) 775 776 expected = b'data' 777 778 conn = client.HTTPConnection('example.com') 779 sock = FakeSocket("") 780 conn.sock = sock 781 conn.send(UpdatingFile()) 782 self.assertEqual(sock.data, expected) 783 784 785 def test_send_iter(self): 786 expected = b'GET /foo HTTP/1.1\r\nHost: example.com\r\n' \ 787 b'Accept-Encoding: identity\r\nContent-Length: 11\r\n' \ 788 b'\r\nonetwothree' 789 790 def body(): 791 yield b"one" 792 yield b"two" 793 yield b"three" 794 795 conn = client.HTTPConnection('example.com') 796 sock = FakeSocket("") 797 conn.sock = sock 798 conn.request('GET', '/foo', body(), {'Content-Length': '11'}) 799 self.assertEqual(sock.data, expected) 800 801 def test_blocksize_request(self): 802 """Check that request() respects the configured block size.""" 803 blocksize = 8 # For easy debugging. 804 conn = client.HTTPConnection('example.com', blocksize=blocksize) 805 sock = FakeSocket(None) 806 conn.sock = sock 807 expected = b"a" * blocksize + b"b" 808 conn.request("PUT", "/", io.BytesIO(expected), {"Content-Length": "9"}) 809 self.assertEqual(sock.sendall_calls, 3) 810 body = sock.data.split(b"\r\n\r\n", 1)[1] 811 self.assertEqual(body, expected) 812 813 def test_blocksize_send(self): 814 """Check that send() respects the configured block size.""" 815 blocksize = 8 # For easy debugging. 816 conn = client.HTTPConnection('example.com', blocksize=blocksize) 817 sock = FakeSocket(None) 818 conn.sock = sock 819 expected = b"a" * blocksize + b"b" 820 conn.send(io.BytesIO(expected)) 821 self.assertEqual(sock.sendall_calls, 2) 822 self.assertEqual(sock.data, expected) 823 824 def test_send_type_error(self): 825 # See: Issue #12676 826 conn = client.HTTPConnection('example.com') 827 conn.sock = FakeSocket('') 828 with self.assertRaises(TypeError): 829 conn.request('POST', 'test', conn) 830 831 def test_chunked(self): 832 expected = chunked_expected 833 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 834 resp = client.HTTPResponse(sock, method="GET") 835 resp.begin() 836 self.assertEqual(resp.read(), expected) 837 resp.close() 838 839 # Various read sizes 840 for n in range(1, 12): 841 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 842 resp = client.HTTPResponse(sock, method="GET") 843 resp.begin() 844 self.assertEqual(resp.read(n) + resp.read(n) + resp.read(), expected) 845 resp.close() 846 847 for x in ('', 'foo\r\n'): 848 sock = FakeSocket(chunked_start + x) 849 resp = client.HTTPResponse(sock, method="GET") 850 resp.begin() 851 try: 852 resp.read() 853 except client.IncompleteRead as i: 854 self.assertEqual(i.partial, expected) 855 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 856 self.assertEqual(repr(i), expected_message) 857 self.assertEqual(str(i), expected_message) 858 else: 859 self.fail('IncompleteRead expected') 860 finally: 861 resp.close() 862 863 def test_readinto_chunked(self): 864 865 expected = chunked_expected 866 nexpected = len(expected) 867 b = bytearray(128) 868 869 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 870 resp = client.HTTPResponse(sock, method="GET") 871 resp.begin() 872 n = resp.readinto(b) 873 self.assertEqual(b[:nexpected], expected) 874 self.assertEqual(n, nexpected) 875 resp.close() 876 877 # Various read sizes 878 for n in range(1, 12): 879 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 880 resp = client.HTTPResponse(sock, method="GET") 881 resp.begin() 882 m = memoryview(b) 883 i = resp.readinto(m[0:n]) 884 i += resp.readinto(m[i:n + i]) 885 i += resp.readinto(m[i:]) 886 self.assertEqual(b[:nexpected], expected) 887 self.assertEqual(i, nexpected) 888 resp.close() 889 890 for x in ('', 'foo\r\n'): 891 sock = FakeSocket(chunked_start + x) 892 resp = client.HTTPResponse(sock, method="GET") 893 resp.begin() 894 try: 895 n = resp.readinto(b) 896 except client.IncompleteRead as i: 897 self.assertEqual(i.partial, expected) 898 expected_message = 'IncompleteRead(%d bytes read)' % len(expected) 899 self.assertEqual(repr(i), expected_message) 900 self.assertEqual(str(i), expected_message) 901 else: 902 self.fail('IncompleteRead expected') 903 finally: 904 resp.close() 905 906 def test_chunked_head(self): 907 chunked_start = ( 908 'HTTP/1.1 200 OK\r\n' 909 'Transfer-Encoding: chunked\r\n\r\n' 910 'a\r\n' 911 'hello world\r\n' 912 '1\r\n' 913 'd\r\n' 914 ) 915 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 916 resp = client.HTTPResponse(sock, method="HEAD") 917 resp.begin() 918 self.assertEqual(resp.read(), b'') 919 self.assertEqual(resp.status, 200) 920 self.assertEqual(resp.reason, 'OK') 921 self.assertTrue(resp.isclosed()) 922 self.assertFalse(resp.closed) 923 resp.close() 924 self.assertTrue(resp.closed) 925 926 def test_readinto_chunked_head(self): 927 chunked_start = ( 928 'HTTP/1.1 200 OK\r\n' 929 'Transfer-Encoding: chunked\r\n\r\n' 930 'a\r\n' 931 'hello world\r\n' 932 '1\r\n' 933 'd\r\n' 934 ) 935 sock = FakeSocket(chunked_start + last_chunk + chunked_end) 936 resp = client.HTTPResponse(sock, method="HEAD") 937 resp.begin() 938 b = bytearray(5) 939 n = resp.readinto(b) 940 self.assertEqual(n, 0) 941 self.assertEqual(bytes(b), b'\x00'*5) 942 self.assertEqual(resp.status, 200) 943 self.assertEqual(resp.reason, 'OK') 944 self.assertTrue(resp.isclosed()) 945 self.assertFalse(resp.closed) 946 resp.close() 947 self.assertTrue(resp.closed) 948 949 def test_negative_content_length(self): 950 sock = FakeSocket( 951 'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n') 952 resp = client.HTTPResponse(sock, method="GET") 953 resp.begin() 954 self.assertEqual(resp.read(), b'Hello\r\n') 955 self.assertTrue(resp.isclosed()) 956 957 def test_incomplete_read(self): 958 sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n') 959 resp = client.HTTPResponse(sock, method="GET") 960 resp.begin() 961 try: 962 resp.read() 963 except client.IncompleteRead as i: 964 self.assertEqual(i.partial, b'Hello\r\n') 965 self.assertEqual(repr(i), 966 "IncompleteRead(7 bytes read, 3 more expected)") 967 self.assertEqual(str(i), 968 "IncompleteRead(7 bytes read, 3 more expected)") 969 self.assertTrue(resp.isclosed()) 970 else: 971 self.fail('IncompleteRead expected') 972 973 def test_epipe(self): 974 sock = EPipeSocket( 975 "HTTP/1.0 401 Authorization Required\r\n" 976 "Content-type: text/html\r\n" 977 "WWW-Authenticate: Basic realm=\"example\"\r\n", 978 b"Content-Length") 979 conn = client.HTTPConnection("example.com") 980 conn.sock = sock 981 self.assertRaises(OSError, 982 lambda: conn.request("PUT", "/url", "body")) 983 resp = conn.getresponse() 984 self.assertEqual(401, resp.status) 985 self.assertEqual("Basic realm=\"example\"", 986 resp.getheader("www-authenticate")) 987 988 # Test lines overflowing the max line size (_MAXLINE in http.client) 989 990 def test_overflowing_status_line(self): 991 body = "HTTP/1.1 200 Ok" + "k" * 65536 + "\r\n" 992 resp = client.HTTPResponse(FakeSocket(body)) 993 self.assertRaises((client.LineTooLong, client.BadStatusLine), resp.begin) 994 995 def test_overflowing_header_line(self): 996 body = ( 997 'HTTP/1.1 200 OK\r\n' 998 'X-Foo: bar' + 'r' * 65536 + '\r\n\r\n' 999 ) 1000 resp = client.HTTPResponse(FakeSocket(body)) 1001 self.assertRaises(client.LineTooLong, resp.begin) 1002 1003 def test_overflowing_chunked_line(self): 1004 body = ( 1005 'HTTP/1.1 200 OK\r\n' 1006 'Transfer-Encoding: chunked\r\n\r\n' 1007 + '0' * 65536 + 'a\r\n' 1008 'hello world\r\n' 1009 '0\r\n' 1010 '\r\n' 1011 ) 1012 resp = client.HTTPResponse(FakeSocket(body)) 1013 resp.begin() 1014 self.assertRaises(client.LineTooLong, resp.read) 1015 1016 def test_early_eof(self): 1017 # Test httpresponse with no \r\n termination, 1018 body = "HTTP/1.1 200 Ok" 1019 sock = FakeSocket(body) 1020 resp = client.HTTPResponse(sock) 1021 resp.begin() 1022 self.assertEqual(resp.read(), b'') 1023 self.assertTrue(resp.isclosed()) 1024 self.assertFalse(resp.closed) 1025 resp.close() 1026 self.assertTrue(resp.closed) 1027 1028 def test_error_leak(self): 1029 # Test that the socket is not leaked if getresponse() fails 1030 conn = client.HTTPConnection('example.com') 1031 response = None 1032 class Response(client.HTTPResponse): 1033 def __init__(self, *pos, **kw): 1034 nonlocal response 1035 response = self # Avoid garbage collector closing the socket 1036 client.HTTPResponse.__init__(self, *pos, **kw) 1037 conn.response_class = Response 1038 conn.sock = FakeSocket('Invalid status line') 1039 conn.request('GET', '/') 1040 self.assertRaises(client.BadStatusLine, conn.getresponse) 1041 self.assertTrue(response.closed) 1042 self.assertTrue(conn.sock.file_closed) 1043 1044 def test_chunked_extension(self): 1045 extra = '3;foo=bar\r\n' + 'abc\r\n' 1046 expected = chunked_expected + b'abc' 1047 1048 sock = FakeSocket(chunked_start + extra + last_chunk_extended + chunked_end) 1049 resp = client.HTTPResponse(sock, method="GET") 1050 resp.begin() 1051 self.assertEqual(resp.read(), expected) 1052 resp.close() 1053 1054 def test_chunked_missing_end(self): 1055 """some servers may serve up a short chunked encoding stream""" 1056 expected = chunked_expected 1057 sock = FakeSocket(chunked_start + last_chunk) #no terminating crlf 1058 resp = client.HTTPResponse(sock, method="GET") 1059 resp.begin() 1060 self.assertEqual(resp.read(), expected) 1061 resp.close() 1062 1063 def test_chunked_trailers(self): 1064 """See that trailers are read and ignored""" 1065 expected = chunked_expected 1066 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end) 1067 resp = client.HTTPResponse(sock, method="GET") 1068 resp.begin() 1069 self.assertEqual(resp.read(), expected) 1070 # we should have reached the end of the file 1071 self.assertEqual(sock.file.read(), b"") #we read to the end 1072 resp.close() 1073 1074 def test_chunked_sync(self): 1075 """Check that we don't read past the end of the chunked-encoding stream""" 1076 expected = chunked_expected 1077 extradata = "extradata" 1078 sock = FakeSocket(chunked_start + last_chunk + trailers + chunked_end + extradata) 1079 resp = client.HTTPResponse(sock, method="GET") 1080 resp.begin() 1081 self.assertEqual(resp.read(), expected) 1082 # the file should now have our extradata ready to be read 1083 self.assertEqual(sock.file.read(), extradata.encode("ascii")) #we read to the end 1084 resp.close() 1085 1086 def test_content_length_sync(self): 1087 """Check that we don't read past the end of the Content-Length stream""" 1088 extradata = b"extradata" 1089 expected = b"Hello123\r\n" 1090 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1091 resp = client.HTTPResponse(sock, method="GET") 1092 resp.begin() 1093 self.assertEqual(resp.read(), expected) 1094 # the file should now have our extradata ready to be read 1095 self.assertEqual(sock.file.read(), extradata) #we read to the end 1096 resp.close() 1097 1098 def test_readlines_content_length(self): 1099 extradata = b"extradata" 1100 expected = b"Hello123\r\n" 1101 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1102 resp = client.HTTPResponse(sock, method="GET") 1103 resp.begin() 1104 self.assertEqual(resp.readlines(2000), [expected]) 1105 # the file should now have our extradata ready to be read 1106 self.assertEqual(sock.file.read(), extradata) #we read to the end 1107 resp.close() 1108 1109 def test_read1_content_length(self): 1110 extradata = b"extradata" 1111 expected = b"Hello123\r\n" 1112 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1113 resp = client.HTTPResponse(sock, method="GET") 1114 resp.begin() 1115 self.assertEqual(resp.read1(2000), expected) 1116 # the file should now have our extradata ready to be read 1117 self.assertEqual(sock.file.read(), extradata) #we read to the end 1118 resp.close() 1119 1120 def test_readline_bound_content_length(self): 1121 extradata = b"extradata" 1122 expected = b"Hello123\r\n" 1123 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n' + expected + extradata) 1124 resp = client.HTTPResponse(sock, method="GET") 1125 resp.begin() 1126 self.assertEqual(resp.readline(10), expected) 1127 self.assertEqual(resp.readline(10), b"") 1128 # the file should now have our extradata ready to be read 1129 self.assertEqual(sock.file.read(), extradata) #we read to the end 1130 resp.close() 1131 1132 def test_read1_bound_content_length(self): 1133 extradata = b"extradata" 1134 expected = b"Hello123\r\n" 1135 sock = FakeSocket(b'HTTP/1.1 200 OK\r\nContent-Length: 30\r\n\r\n' + expected*3 + extradata) 1136 resp = client.HTTPResponse(sock, method="GET") 1137 resp.begin() 1138 self.assertEqual(resp.read1(20), expected*2) 1139 self.assertEqual(resp.read(), expected) 1140 # the file should now have our extradata ready to be read 1141 self.assertEqual(sock.file.read(), extradata) #we read to the end 1142 resp.close() 1143 1144 def test_response_fileno(self): 1145 # Make sure fd returned by fileno is valid. 1146 serv = socket.create_server((HOST, 0)) 1147 self.addCleanup(serv.close) 1148 1149 result = None 1150 def run_server(): 1151 [conn, address] = serv.accept() 1152 with conn, conn.makefile("rb") as reader: 1153 # Read the request header until a blank line 1154 while True: 1155 line = reader.readline() 1156 if not line.rstrip(b"\r\n"): 1157 break 1158 conn.sendall(b"HTTP/1.1 200 Connection established\r\n\r\n") 1159 nonlocal result 1160 result = reader.read() 1161 1162 thread = threading.Thread(target=run_server) 1163 thread.start() 1164 self.addCleanup(thread.join, float(1)) 1165 conn = client.HTTPConnection(*serv.getsockname()) 1166 conn.request("CONNECT", "dummy:1234") 1167 response = conn.getresponse() 1168 try: 1169 self.assertEqual(response.status, client.OK) 1170 s = socket.socket(fileno=response.fileno()) 1171 try: 1172 s.sendall(b"proxied data\n") 1173 finally: 1174 s.detach() 1175 finally: 1176 response.close() 1177 conn.close() 1178 thread.join() 1179 self.assertEqual(result, b"proxied data\n") 1180 1181 def test_putrequest_override_domain_validation(self): 1182 """ 1183 It should be possible to override the default validation 1184 behavior in putrequest (bpo-38216). 1185 """ 1186 class UnsafeHTTPConnection(client.HTTPConnection): 1187 def _validate_path(self, url): 1188 pass 1189 1190 conn = UnsafeHTTPConnection('example.com') 1191 conn.sock = FakeSocket('') 1192 conn.putrequest('GET', '/\x00') 1193 1194 def test_putrequest_override_host_validation(self): 1195 class UnsafeHTTPConnection(client.HTTPConnection): 1196 def _validate_host(self, url): 1197 pass 1198 1199 conn = UnsafeHTTPConnection('example.com\r\n') 1200 conn.sock = FakeSocket('') 1201 # set skip_host so a ValueError is not raised upon adding the 1202 # invalid URL as the value of the "Host:" header 1203 conn.putrequest('GET', '/', skip_host=1) 1204 1205 def test_putrequest_override_encoding(self): 1206 """ 1207 It should be possible to override the default encoding 1208 to transmit bytes in another encoding even if invalid 1209 (bpo-36274). 1210 """ 1211 class UnsafeHTTPConnection(client.HTTPConnection): 1212 def _encode_request(self, str_url): 1213 return str_url.encode('utf-8') 1214 1215 conn = UnsafeHTTPConnection('example.com') 1216 conn.sock = FakeSocket('') 1217 conn.putrequest('GET', '/☃') 1218 1219 1220class ExtendedReadTest(TestCase): 1221 """ 1222 Test peek(), read1(), readline() 1223 """ 1224 lines = ( 1225 'HTTP/1.1 200 OK\r\n' 1226 '\r\n' 1227 'hello world!\n' 1228 'and now \n' 1229 'for something completely different\n' 1230 'foo' 1231 ) 1232 lines_expected = lines[lines.find('hello'):].encode("ascii") 1233 lines_chunked = ( 1234 'HTTP/1.1 200 OK\r\n' 1235 'Transfer-Encoding: chunked\r\n\r\n' 1236 'a\r\n' 1237 'hello worl\r\n' 1238 '3\r\n' 1239 'd!\n\r\n' 1240 '9\r\n' 1241 'and now \n\r\n' 1242 '23\r\n' 1243 'for something completely different\n\r\n' 1244 '3\r\n' 1245 'foo\r\n' 1246 '0\r\n' # terminating chunk 1247 '\r\n' # end of trailers 1248 ) 1249 1250 def setUp(self): 1251 sock = FakeSocket(self.lines) 1252 resp = client.HTTPResponse(sock, method="GET") 1253 resp.begin() 1254 resp.fp = io.BufferedReader(resp.fp) 1255 self.resp = resp 1256 1257 1258 1259 def test_peek(self): 1260 resp = self.resp 1261 # patch up the buffered peek so that it returns not too much stuff 1262 oldpeek = resp.fp.peek 1263 def mypeek(n=-1): 1264 p = oldpeek(n) 1265 if n >= 0: 1266 return p[:n] 1267 return p[:10] 1268 resp.fp.peek = mypeek 1269 1270 all = [] 1271 while True: 1272 # try a short peek 1273 p = resp.peek(3) 1274 if p: 1275 self.assertGreater(len(p), 0) 1276 # then unbounded peek 1277 p2 = resp.peek() 1278 self.assertGreaterEqual(len(p2), len(p)) 1279 self.assertTrue(p2.startswith(p)) 1280 next = resp.read(len(p2)) 1281 self.assertEqual(next, p2) 1282 else: 1283 next = resp.read() 1284 self.assertFalse(next) 1285 all.append(next) 1286 if not next: 1287 break 1288 self.assertEqual(b"".join(all), self.lines_expected) 1289 1290 def test_readline(self): 1291 resp = self.resp 1292 self._verify_readline(self.resp.readline, self.lines_expected) 1293 1294 def _verify_readline(self, readline, expected): 1295 all = [] 1296 while True: 1297 # short readlines 1298 line = readline(5) 1299 if line and line != b"foo": 1300 if len(line) < 5: 1301 self.assertTrue(line.endswith(b"\n")) 1302 all.append(line) 1303 if not line: 1304 break 1305 self.assertEqual(b"".join(all), expected) 1306 1307 def test_read1(self): 1308 resp = self.resp 1309 def r(): 1310 res = resp.read1(4) 1311 self.assertLessEqual(len(res), 4) 1312 return res 1313 readliner = Readliner(r) 1314 self._verify_readline(readliner.readline, self.lines_expected) 1315 1316 def test_read1_unbounded(self): 1317 resp = self.resp 1318 all = [] 1319 while True: 1320 data = resp.read1() 1321 if not data: 1322 break 1323 all.append(data) 1324 self.assertEqual(b"".join(all), self.lines_expected) 1325 1326 def test_read1_bounded(self): 1327 resp = self.resp 1328 all = [] 1329 while True: 1330 data = resp.read1(10) 1331 if not data: 1332 break 1333 self.assertLessEqual(len(data), 10) 1334 all.append(data) 1335 self.assertEqual(b"".join(all), self.lines_expected) 1336 1337 def test_read1_0(self): 1338 self.assertEqual(self.resp.read1(0), b"") 1339 1340 def test_peek_0(self): 1341 p = self.resp.peek(0) 1342 self.assertLessEqual(0, len(p)) 1343 1344 1345class ExtendedReadTestChunked(ExtendedReadTest): 1346 """ 1347 Test peek(), read1(), readline() in chunked mode 1348 """ 1349 lines = ( 1350 'HTTP/1.1 200 OK\r\n' 1351 'Transfer-Encoding: chunked\r\n\r\n' 1352 'a\r\n' 1353 'hello worl\r\n' 1354 '3\r\n' 1355 'd!\n\r\n' 1356 '9\r\n' 1357 'and now \n\r\n' 1358 '23\r\n' 1359 'for something completely different\n\r\n' 1360 '3\r\n' 1361 'foo\r\n' 1362 '0\r\n' # terminating chunk 1363 '\r\n' # end of trailers 1364 ) 1365 1366 1367class Readliner: 1368 """ 1369 a simple readline class that uses an arbitrary read function and buffering 1370 """ 1371 def __init__(self, readfunc): 1372 self.readfunc = readfunc 1373 self.remainder = b"" 1374 1375 def readline(self, limit): 1376 data = [] 1377 datalen = 0 1378 read = self.remainder 1379 try: 1380 while True: 1381 idx = read.find(b'\n') 1382 if idx != -1: 1383 break 1384 if datalen + len(read) >= limit: 1385 idx = limit - datalen - 1 1386 # read more data 1387 data.append(read) 1388 read = self.readfunc() 1389 if not read: 1390 idx = 0 #eof condition 1391 break 1392 idx += 1 1393 data.append(read[:idx]) 1394 self.remainder = read[idx:] 1395 return b"".join(data) 1396 except: 1397 self.remainder = b"".join(data) 1398 raise 1399 1400 1401class OfflineTest(TestCase): 1402 def test_all(self): 1403 # Documented objects defined in the module should be in __all__ 1404 expected = {"responses"} # White-list documented dict() object 1405 # HTTPMessage, parse_headers(), and the HTTP status code constants are 1406 # intentionally omitted for simplicity 1407 blacklist = {"HTTPMessage", "parse_headers"} 1408 for name in dir(client): 1409 if name.startswith("_") or name in blacklist: 1410 continue 1411 module_object = getattr(client, name) 1412 if getattr(module_object, "__module__", None) == "http.client": 1413 expected.add(name) 1414 self.assertCountEqual(client.__all__, expected) 1415 1416 def test_responses(self): 1417 self.assertEqual(client.responses[client.NOT_FOUND], "Not Found") 1418 1419 def test_client_constants(self): 1420 # Make sure we don't break backward compatibility with 3.4 1421 expected = [ 1422 'CONTINUE', 1423 'SWITCHING_PROTOCOLS', 1424 'PROCESSING', 1425 'OK', 1426 'CREATED', 1427 'ACCEPTED', 1428 'NON_AUTHORITATIVE_INFORMATION', 1429 'NO_CONTENT', 1430 'RESET_CONTENT', 1431 'PARTIAL_CONTENT', 1432 'MULTI_STATUS', 1433 'IM_USED', 1434 'MULTIPLE_CHOICES', 1435 'MOVED_PERMANENTLY', 1436 'FOUND', 1437 'SEE_OTHER', 1438 'NOT_MODIFIED', 1439 'USE_PROXY', 1440 'TEMPORARY_REDIRECT', 1441 'BAD_REQUEST', 1442 'UNAUTHORIZED', 1443 'PAYMENT_REQUIRED', 1444 'FORBIDDEN', 1445 'NOT_FOUND', 1446 'METHOD_NOT_ALLOWED', 1447 'NOT_ACCEPTABLE', 1448 'PROXY_AUTHENTICATION_REQUIRED', 1449 'REQUEST_TIMEOUT', 1450 'CONFLICT', 1451 'GONE', 1452 'LENGTH_REQUIRED', 1453 'PRECONDITION_FAILED', 1454 'REQUEST_ENTITY_TOO_LARGE', 1455 'REQUEST_URI_TOO_LONG', 1456 'UNSUPPORTED_MEDIA_TYPE', 1457 'REQUESTED_RANGE_NOT_SATISFIABLE', 1458 'EXPECTATION_FAILED', 1459 'IM_A_TEAPOT', 1460 'MISDIRECTED_REQUEST', 1461 'UNPROCESSABLE_ENTITY', 1462 'LOCKED', 1463 'FAILED_DEPENDENCY', 1464 'UPGRADE_REQUIRED', 1465 'PRECONDITION_REQUIRED', 1466 'TOO_MANY_REQUESTS', 1467 'REQUEST_HEADER_FIELDS_TOO_LARGE', 1468 'UNAVAILABLE_FOR_LEGAL_REASONS', 1469 'INTERNAL_SERVER_ERROR', 1470 'NOT_IMPLEMENTED', 1471 'BAD_GATEWAY', 1472 'SERVICE_UNAVAILABLE', 1473 'GATEWAY_TIMEOUT', 1474 'HTTP_VERSION_NOT_SUPPORTED', 1475 'INSUFFICIENT_STORAGE', 1476 'NOT_EXTENDED', 1477 'NETWORK_AUTHENTICATION_REQUIRED', 1478 'EARLY_HINTS', 1479 'TOO_EARLY' 1480 ] 1481 for const in expected: 1482 with self.subTest(constant=const): 1483 self.assertTrue(hasattr(client, const)) 1484 1485 1486class SourceAddressTest(TestCase): 1487 def setUp(self): 1488 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1489 self.port = socket_helper.bind_port(self.serv) 1490 self.source_port = socket_helper.find_unused_port() 1491 self.serv.listen() 1492 self.conn = None 1493 1494 def tearDown(self): 1495 if self.conn: 1496 self.conn.close() 1497 self.conn = None 1498 self.serv.close() 1499 self.serv = None 1500 1501 def testHTTPConnectionSourceAddress(self): 1502 self.conn = client.HTTPConnection(HOST, self.port, 1503 source_address=('', self.source_port)) 1504 self.conn.connect() 1505 self.assertEqual(self.conn.sock.getsockname()[1], self.source_port) 1506 1507 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1508 'http.client.HTTPSConnection not defined') 1509 def testHTTPSConnectionSourceAddress(self): 1510 self.conn = client.HTTPSConnection(HOST, self.port, 1511 source_address=('', self.source_port)) 1512 # We don't test anything here other than the constructor not barfing as 1513 # this code doesn't deal with setting up an active running SSL server 1514 # for an ssl_wrapped connect() to actually return from. 1515 1516 1517class TimeoutTest(TestCase): 1518 PORT = None 1519 1520 def setUp(self): 1521 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1522 TimeoutTest.PORT = socket_helper.bind_port(self.serv) 1523 self.serv.listen() 1524 1525 def tearDown(self): 1526 self.serv.close() 1527 self.serv = None 1528 1529 def testTimeoutAttribute(self): 1530 # This will prove that the timeout gets through HTTPConnection 1531 # and into the socket. 1532 1533 # default -- use global socket timeout 1534 self.assertIsNone(socket.getdefaulttimeout()) 1535 socket.setdefaulttimeout(30) 1536 try: 1537 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT) 1538 httpConn.connect() 1539 finally: 1540 socket.setdefaulttimeout(None) 1541 self.assertEqual(httpConn.sock.gettimeout(), 30) 1542 httpConn.close() 1543 1544 # no timeout -- do not use global socket default 1545 self.assertIsNone(socket.getdefaulttimeout()) 1546 socket.setdefaulttimeout(30) 1547 try: 1548 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, 1549 timeout=None) 1550 httpConn.connect() 1551 finally: 1552 socket.setdefaulttimeout(None) 1553 self.assertEqual(httpConn.sock.gettimeout(), None) 1554 httpConn.close() 1555 1556 # a value 1557 httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30) 1558 httpConn.connect() 1559 self.assertEqual(httpConn.sock.gettimeout(), 30) 1560 httpConn.close() 1561 1562 1563class PersistenceTest(TestCase): 1564 1565 def test_reuse_reconnect(self): 1566 # Should reuse or reconnect depending on header from server 1567 tests = ( 1568 ('1.0', '', False), 1569 ('1.0', 'Connection: keep-alive\r\n', True), 1570 ('1.1', '', True), 1571 ('1.1', 'Connection: close\r\n', False), 1572 ('1.0', 'Connection: keep-ALIVE\r\n', True), 1573 ('1.1', 'Connection: cloSE\r\n', False), 1574 ) 1575 for version, header, reuse in tests: 1576 with self.subTest(version=version, header=header): 1577 msg = ( 1578 'HTTP/{} 200 OK\r\n' 1579 '{}' 1580 'Content-Length: 12\r\n' 1581 '\r\n' 1582 'Dummy body\r\n' 1583 ).format(version, header) 1584 conn = FakeSocketHTTPConnection(msg) 1585 self.assertIsNone(conn.sock) 1586 conn.request('GET', '/open-connection') 1587 with conn.getresponse() as response: 1588 self.assertEqual(conn.sock is None, not reuse) 1589 response.read() 1590 self.assertEqual(conn.sock is None, not reuse) 1591 self.assertEqual(conn.connections, 1) 1592 conn.request('GET', '/subsequent-request') 1593 self.assertEqual(conn.connections, 1 if reuse else 2) 1594 1595 def test_disconnected(self): 1596 1597 def make_reset_reader(text): 1598 """Return BufferedReader that raises ECONNRESET at EOF""" 1599 stream = io.BytesIO(text) 1600 def readinto(buffer): 1601 size = io.BytesIO.readinto(stream, buffer) 1602 if size == 0: 1603 raise ConnectionResetError() 1604 return size 1605 stream.readinto = readinto 1606 return io.BufferedReader(stream) 1607 1608 tests = ( 1609 (io.BytesIO, client.RemoteDisconnected), 1610 (make_reset_reader, ConnectionResetError), 1611 ) 1612 for stream_factory, exception in tests: 1613 with self.subTest(exception=exception): 1614 conn = FakeSocketHTTPConnection(b'', stream_factory) 1615 conn.request('GET', '/eof-response') 1616 self.assertRaises(exception, conn.getresponse) 1617 self.assertIsNone(conn.sock) 1618 # HTTPConnection.connect() should be automatically invoked 1619 conn.request('GET', '/reconnect') 1620 self.assertEqual(conn.connections, 2) 1621 1622 def test_100_close(self): 1623 conn = FakeSocketHTTPConnection( 1624 b'HTTP/1.1 100 Continue\r\n' 1625 b'\r\n' 1626 # Missing final response 1627 ) 1628 conn.request('GET', '/', headers={'Expect': '100-continue'}) 1629 self.assertRaises(client.RemoteDisconnected, conn.getresponse) 1630 self.assertIsNone(conn.sock) 1631 conn.request('GET', '/reconnect') 1632 self.assertEqual(conn.connections, 2) 1633 1634 1635class HTTPSTest(TestCase): 1636 1637 def setUp(self): 1638 if not hasattr(client, 'HTTPSConnection'): 1639 self.skipTest('ssl support required') 1640 1641 def make_server(self, certfile): 1642 from test.ssl_servers import make_https_server 1643 return make_https_server(self, certfile=certfile) 1644 1645 def test_attributes(self): 1646 # simple test to check it's storing the timeout 1647 h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30) 1648 self.assertEqual(h.timeout, 30) 1649 1650 def test_networked(self): 1651 # Default settings: requires a valid cert from a trusted CA 1652 import ssl 1653 support.requires('network') 1654 with socket_helper.transient_internet('self-signed.pythontest.net'): 1655 h = client.HTTPSConnection('self-signed.pythontest.net', 443) 1656 with self.assertRaises(ssl.SSLError) as exc_info: 1657 h.request('GET', '/') 1658 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1659 1660 def test_networked_noverification(self): 1661 # Switch off cert verification 1662 import ssl 1663 support.requires('network') 1664 with socket_helper.transient_internet('self-signed.pythontest.net'): 1665 context = ssl._create_unverified_context() 1666 h = client.HTTPSConnection('self-signed.pythontest.net', 443, 1667 context=context) 1668 h.request('GET', '/') 1669 resp = h.getresponse() 1670 h.close() 1671 self.assertIn('nginx', resp.getheader('server')) 1672 resp.close() 1673 1674 @support.system_must_validate_cert 1675 def test_networked_trusted_by_default_cert(self): 1676 # Default settings: requires a valid cert from a trusted CA 1677 support.requires('network') 1678 with socket_helper.transient_internet('www.python.org'): 1679 h = client.HTTPSConnection('www.python.org', 443) 1680 h.request('GET', '/') 1681 resp = h.getresponse() 1682 content_type = resp.getheader('content-type') 1683 resp.close() 1684 h.close() 1685 self.assertIn('text/html', content_type) 1686 1687 def test_networked_good_cert(self): 1688 # We feed the server's cert as a validating cert 1689 import ssl 1690 support.requires('network') 1691 selfsigned_pythontestdotnet = 'self-signed.pythontest.net' 1692 with socket_helper.transient_internet(selfsigned_pythontestdotnet): 1693 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1694 self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) 1695 self.assertEqual(context.check_hostname, True) 1696 context.load_verify_locations(CERT_selfsigned_pythontestdotnet) 1697 try: 1698 h = client.HTTPSConnection(selfsigned_pythontestdotnet, 443, 1699 context=context) 1700 h.request('GET', '/') 1701 resp = h.getresponse() 1702 except ssl.SSLError as ssl_err: 1703 ssl_err_str = str(ssl_err) 1704 # In the error message of [SSL: CERTIFICATE_VERIFY_FAILED] on 1705 # modern Linux distros (Debian Buster, etc) default OpenSSL 1706 # configurations it'll fail saying "key too weak" until we 1707 # address https://bugs.python.org/issue36816 to use a proper 1708 # key size on self-signed.pythontest.net. 1709 if re.search(r'(?i)key.too.weak', ssl_err_str): 1710 raise unittest.SkipTest( 1711 f'Got {ssl_err_str} trying to connect ' 1712 f'to {selfsigned_pythontestdotnet}. ' 1713 'See https://bugs.python.org/issue36816.') 1714 raise 1715 server_string = resp.getheader('server') 1716 resp.close() 1717 h.close() 1718 self.assertIn('nginx', server_string) 1719 1720 def test_networked_bad_cert(self): 1721 # We feed a "CA" cert that is unrelated to the server's cert 1722 import ssl 1723 support.requires('network') 1724 with socket_helper.transient_internet('self-signed.pythontest.net'): 1725 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1726 context.load_verify_locations(CERT_localhost) 1727 h = client.HTTPSConnection('self-signed.pythontest.net', 443, context=context) 1728 with self.assertRaises(ssl.SSLError) as exc_info: 1729 h.request('GET', '/') 1730 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1731 1732 def test_local_unknown_cert(self): 1733 # The custom cert isn't known to the default trust bundle 1734 import ssl 1735 server = self.make_server(CERT_localhost) 1736 h = client.HTTPSConnection('localhost', server.port) 1737 with self.assertRaises(ssl.SSLError) as exc_info: 1738 h.request('GET', '/') 1739 self.assertEqual(exc_info.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 1740 1741 def test_local_good_hostname(self): 1742 # The (valid) cert validates the HTTP hostname 1743 import ssl 1744 server = self.make_server(CERT_localhost) 1745 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1746 context.load_verify_locations(CERT_localhost) 1747 h = client.HTTPSConnection('localhost', server.port, context=context) 1748 self.addCleanup(h.close) 1749 h.request('GET', '/nonexistent') 1750 resp = h.getresponse() 1751 self.addCleanup(resp.close) 1752 self.assertEqual(resp.status, 404) 1753 1754 def test_local_bad_hostname(self): 1755 # The (valid) cert doesn't validate the HTTP hostname 1756 import ssl 1757 server = self.make_server(CERT_fakehostname) 1758 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1759 context.load_verify_locations(CERT_fakehostname) 1760 h = client.HTTPSConnection('localhost', server.port, context=context) 1761 with self.assertRaises(ssl.CertificateError): 1762 h.request('GET', '/') 1763 # Same with explicit check_hostname=True 1764 with support.check_warnings(('', DeprecationWarning)): 1765 h = client.HTTPSConnection('localhost', server.port, 1766 context=context, check_hostname=True) 1767 with self.assertRaises(ssl.CertificateError): 1768 h.request('GET', '/') 1769 # With check_hostname=False, the mismatching is ignored 1770 context.check_hostname = False 1771 with support.check_warnings(('', DeprecationWarning)): 1772 h = client.HTTPSConnection('localhost', server.port, 1773 context=context, check_hostname=False) 1774 h.request('GET', '/nonexistent') 1775 resp = h.getresponse() 1776 resp.close() 1777 h.close() 1778 self.assertEqual(resp.status, 404) 1779 # The context's check_hostname setting is used if one isn't passed to 1780 # HTTPSConnection. 1781 context.check_hostname = False 1782 h = client.HTTPSConnection('localhost', server.port, context=context) 1783 h.request('GET', '/nonexistent') 1784 resp = h.getresponse() 1785 self.assertEqual(resp.status, 404) 1786 resp.close() 1787 h.close() 1788 # Passing check_hostname to HTTPSConnection should override the 1789 # context's setting. 1790 with support.check_warnings(('', DeprecationWarning)): 1791 h = client.HTTPSConnection('localhost', server.port, 1792 context=context, check_hostname=True) 1793 with self.assertRaises(ssl.CertificateError): 1794 h.request('GET', '/') 1795 1796 @unittest.skipIf(not hasattr(client, 'HTTPSConnection'), 1797 'http.client.HTTPSConnection not available') 1798 def test_host_port(self): 1799 # Check invalid host_port 1800 1801 for hp in ("www.python.org:abc", "user:password@www.python.org"): 1802 self.assertRaises(client.InvalidURL, client.HTTPSConnection, hp) 1803 1804 for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000", 1805 "fe80::207:e9ff:fe9b", 8000), 1806 ("www.python.org:443", "www.python.org", 443), 1807 ("www.python.org:", "www.python.org", 443), 1808 ("www.python.org", "www.python.org", 443), 1809 ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 443), 1810 ("[fe80::207:e9ff:fe9b]:", "fe80::207:e9ff:fe9b", 1811 443)): 1812 c = client.HTTPSConnection(hp) 1813 self.assertEqual(h, c.host) 1814 self.assertEqual(p, c.port) 1815 1816 def test_tls13_pha(self): 1817 import ssl 1818 if not ssl.HAS_TLSv1_3: 1819 self.skipTest('TLS 1.3 support required') 1820 # just check status of PHA flag 1821 h = client.HTTPSConnection('localhost', 443) 1822 self.assertTrue(h._context.post_handshake_auth) 1823 1824 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1825 self.assertFalse(context.post_handshake_auth) 1826 h = client.HTTPSConnection('localhost', 443, context=context) 1827 self.assertIs(h._context, context) 1828 self.assertFalse(h._context.post_handshake_auth) 1829 1830 with warnings.catch_warnings(): 1831 warnings.filterwarnings('ignore', 'key_file, cert_file and check_hostname are deprecated', 1832 DeprecationWarning) 1833 h = client.HTTPSConnection('localhost', 443, context=context, 1834 cert_file=CERT_localhost) 1835 self.assertTrue(h._context.post_handshake_auth) 1836 1837 1838class RequestBodyTest(TestCase): 1839 """Test cases where a request includes a message body.""" 1840 1841 def setUp(self): 1842 self.conn = client.HTTPConnection('example.com') 1843 self.conn.sock = self.sock = FakeSocket("") 1844 self.conn.sock = self.sock 1845 1846 def get_headers_and_fp(self): 1847 f = io.BytesIO(self.sock.data) 1848 f.readline() # read the request line 1849 message = client.parse_headers(f) 1850 return message, f 1851 1852 def test_list_body(self): 1853 # Note that no content-length is automatically calculated for 1854 # an iterable. The request will fall back to send chunked 1855 # transfer encoding. 1856 cases = ( 1857 ([b'foo', b'bar'], b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1858 ((b'foo', b'bar'), b'3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n'), 1859 ) 1860 for body, expected in cases: 1861 with self.subTest(body): 1862 self.conn = client.HTTPConnection('example.com') 1863 self.conn.sock = self.sock = FakeSocket('') 1864 1865 self.conn.request('PUT', '/url', body) 1866 msg, f = self.get_headers_and_fp() 1867 self.assertNotIn('Content-Type', msg) 1868 self.assertNotIn('Content-Length', msg) 1869 self.assertEqual(msg.get('Transfer-Encoding'), 'chunked') 1870 self.assertEqual(expected, f.read()) 1871 1872 def test_manual_content_length(self): 1873 # Set an incorrect content-length so that we can verify that 1874 # it will not be over-ridden by the library. 1875 self.conn.request("PUT", "/url", "body", 1876 {"Content-Length": "42"}) 1877 message, f = self.get_headers_and_fp() 1878 self.assertEqual("42", message.get("content-length")) 1879 self.assertEqual(4, len(f.read())) 1880 1881 def test_ascii_body(self): 1882 self.conn.request("PUT", "/url", "body") 1883 message, f = self.get_headers_and_fp() 1884 self.assertEqual("text/plain", message.get_content_type()) 1885 self.assertIsNone(message.get_charset()) 1886 self.assertEqual("4", message.get("content-length")) 1887 self.assertEqual(b'body', f.read()) 1888 1889 def test_latin1_body(self): 1890 self.conn.request("PUT", "/url", "body\xc1") 1891 message, f = self.get_headers_and_fp() 1892 self.assertEqual("text/plain", message.get_content_type()) 1893 self.assertIsNone(message.get_charset()) 1894 self.assertEqual("5", message.get("content-length")) 1895 self.assertEqual(b'body\xc1', f.read()) 1896 1897 def test_bytes_body(self): 1898 self.conn.request("PUT", "/url", b"body\xc1") 1899 message, f = self.get_headers_and_fp() 1900 self.assertEqual("text/plain", message.get_content_type()) 1901 self.assertIsNone(message.get_charset()) 1902 self.assertEqual("5", message.get("content-length")) 1903 self.assertEqual(b'body\xc1', f.read()) 1904 1905 def test_text_file_body(self): 1906 self.addCleanup(support.unlink, support.TESTFN) 1907 with open(support.TESTFN, "w") as f: 1908 f.write("body") 1909 with open(support.TESTFN) as f: 1910 self.conn.request("PUT", "/url", f) 1911 message, f = self.get_headers_and_fp() 1912 self.assertEqual("text/plain", message.get_content_type()) 1913 self.assertIsNone(message.get_charset()) 1914 # No content-length will be determined for files; the body 1915 # will be sent using chunked transfer encoding instead. 1916 self.assertIsNone(message.get("content-length")) 1917 self.assertEqual("chunked", message.get("transfer-encoding")) 1918 self.assertEqual(b'4\r\nbody\r\n0\r\n\r\n', f.read()) 1919 1920 def test_binary_file_body(self): 1921 self.addCleanup(support.unlink, support.TESTFN) 1922 with open(support.TESTFN, "wb") as f: 1923 f.write(b"body\xc1") 1924 with open(support.TESTFN, "rb") as f: 1925 self.conn.request("PUT", "/url", f) 1926 message, f = self.get_headers_and_fp() 1927 self.assertEqual("text/plain", message.get_content_type()) 1928 self.assertIsNone(message.get_charset()) 1929 self.assertEqual("chunked", message.get("Transfer-Encoding")) 1930 self.assertNotIn("Content-Length", message) 1931 self.assertEqual(b'5\r\nbody\xc1\r\n0\r\n\r\n', f.read()) 1932 1933 1934class HTTPResponseTest(TestCase): 1935 1936 def setUp(self): 1937 body = "HTTP/1.1 200 Ok\r\nMy-Header: first-value\r\nMy-Header: \ 1938 second-value\r\n\r\nText" 1939 sock = FakeSocket(body) 1940 self.resp = client.HTTPResponse(sock) 1941 self.resp.begin() 1942 1943 def test_getting_header(self): 1944 header = self.resp.getheader('My-Header') 1945 self.assertEqual(header, 'first-value, second-value') 1946 1947 header = self.resp.getheader('My-Header', 'some default') 1948 self.assertEqual(header, 'first-value, second-value') 1949 1950 def test_getting_nonexistent_header_with_string_default(self): 1951 header = self.resp.getheader('No-Such-Header', 'default-value') 1952 self.assertEqual(header, 'default-value') 1953 1954 def test_getting_nonexistent_header_with_iterable_default(self): 1955 header = self.resp.getheader('No-Such-Header', ['default', 'values']) 1956 self.assertEqual(header, 'default, values') 1957 1958 header = self.resp.getheader('No-Such-Header', ('default', 'values')) 1959 self.assertEqual(header, 'default, values') 1960 1961 def test_getting_nonexistent_header_without_default(self): 1962 header = self.resp.getheader('No-Such-Header') 1963 self.assertEqual(header, None) 1964 1965 def test_getting_header_defaultint(self): 1966 header = self.resp.getheader('No-Such-Header',default=42) 1967 self.assertEqual(header, 42) 1968 1969class TunnelTests(TestCase): 1970 def setUp(self): 1971 response_text = ( 1972 'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT 1973 'HTTP/1.1 200 OK\r\n' # Reply to HEAD 1974 'Content-Length: 42\r\n\r\n' 1975 ) 1976 self.host = 'proxy.com' 1977 self.conn = client.HTTPConnection(self.host) 1978 self.conn._create_connection = self._create_connection(response_text) 1979 1980 def tearDown(self): 1981 self.conn.close() 1982 1983 def _create_connection(self, response_text): 1984 def create_connection(address, timeout=None, source_address=None): 1985 return FakeSocket(response_text, host=address[0], port=address[1]) 1986 return create_connection 1987 1988 def test_set_tunnel_host_port_headers(self): 1989 tunnel_host = 'destination.com' 1990 tunnel_port = 8888 1991 tunnel_headers = {'User-Agent': 'Mozilla/5.0 (compatible, MSIE 11)'} 1992 self.conn.set_tunnel(tunnel_host, port=tunnel_port, 1993 headers=tunnel_headers) 1994 self.conn.request('HEAD', '/', '') 1995 self.assertEqual(self.conn.sock.host, self.host) 1996 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 1997 self.assertEqual(self.conn._tunnel_host, tunnel_host) 1998 self.assertEqual(self.conn._tunnel_port, tunnel_port) 1999 self.assertEqual(self.conn._tunnel_headers, tunnel_headers) 2000 2001 def test_disallow_set_tunnel_after_connect(self): 2002 # Once connected, we shouldn't be able to tunnel anymore 2003 self.conn.connect() 2004 self.assertRaises(RuntimeError, self.conn.set_tunnel, 2005 'destination.com') 2006 2007 def test_connect_with_tunnel(self): 2008 self.conn.set_tunnel('destination.com') 2009 self.conn.request('HEAD', '/', '') 2010 self.assertEqual(self.conn.sock.host, self.host) 2011 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2012 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2013 # issue22095 2014 self.assertNotIn(b'Host: destination.com:None', self.conn.sock.data) 2015 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2016 2017 # This test should be removed when CONNECT gets the HTTP/1.1 blessing 2018 self.assertNotIn(b'Host: proxy.com', self.conn.sock.data) 2019 2020 def test_connect_put_request(self): 2021 self.conn.set_tunnel('destination.com') 2022 self.conn.request('PUT', '/', '') 2023 self.assertEqual(self.conn.sock.host, self.host) 2024 self.assertEqual(self.conn.sock.port, client.HTTP_PORT) 2025 self.assertIn(b'CONNECT destination.com', self.conn.sock.data) 2026 self.assertIn(b'Host: destination.com', self.conn.sock.data) 2027 2028 def test_tunnel_debuglog(self): 2029 expected_header = 'X-Dummy: 1' 2030 response_text = 'HTTP/1.0 200 OK\r\n{}\r\n\r\n'.format(expected_header) 2031 2032 self.conn.set_debuglevel(1) 2033 self.conn._create_connection = self._create_connection(response_text) 2034 self.conn.set_tunnel('destination.com') 2035 2036 with support.captured_stdout() as output: 2037 self.conn.request('PUT', '/', '') 2038 lines = output.getvalue().splitlines() 2039 self.assertIn('header: {}'.format(expected_header), lines) 2040 2041 2042if __name__ == '__main__': 2043 unittest.main(verbosity=2) 2044