1import httplib2 2import pytest 3from six.moves import urllib 4import socket 5import ssl 6import tests 7 8 9def test_get_via_https(): 10 # Test that we can handle HTTPS 11 http = httplib2.Http(ca_certs=tests.CA_CERTS) 12 with tests.server_const_http(tls=True) as uri: 13 response, _ = http.request(uri, "GET") 14 assert response.status == 200 15 16 17def test_get_301_via_https(): 18 http = httplib2.Http(ca_certs=tests.CA_CERTS) 19 glocation = [""] # nonlocal kind of trick, maybe redundant 20 21 def handler(request): 22 if request.uri == "/final": 23 return tests.http_response_bytes(body=b"final") 24 return tests.http_response_bytes(status="301 goto", headers={"location": glocation[0]}) 25 26 with tests.server_request(handler, request_count=2, tls=True) as uri: 27 glocation[0] = urllib.parse.urljoin(uri, "/final") 28 response, content = http.request(uri, "GET") 29 assert response.status == 200 30 assert content == b"final" 31 assert response.previous.status == 301 32 assert response.previous["location"] == glocation[0] 33 34 35def test_get_301_via_https_spec_violation_on_location(): 36 # Test that we follow redirects through HTTPS 37 # even if they violate the spec by including 38 # a relative Location: header instead of an absolute one. 39 http = httplib2.Http(ca_certs=tests.CA_CERTS) 40 41 def handler(request): 42 if request.uri == "/final": 43 return tests.http_response_bytes(body=b"final") 44 return tests.http_response_bytes(status="301 goto", headers={"location": "/final"}) 45 46 with tests.server_request(handler, request_count=2, tls=True) as uri: 47 response, content = http.request(uri, "GET") 48 assert response.status == 200 49 assert content == b"final" 50 assert response.previous.status == 301 51 52 53def test_invalid_ca_certs_path(): 54 http = httplib2.Http(ca_certs="/nosuchfile") 55 with tests.server_const_http(request_count=0, tls=True) as uri: 56 with tests.assert_raises(IOError): 57 http.request(uri, "GET") 58 59 60def test_not_trusted_ca(): 61 # Test that we get a SSLHandshakeError if we try to access 62 # server using a CA cert file that doesn't contain server's CA. 63 http = httplib2.Http(ca_certs=tests.CA_UNUSED_CERTS) 64 with tests.server_const_http(tls=True) as uri: 65 try: 66 http.request(uri, "GET") 67 assert False, "expected CERTIFICATE_VERIFY_FAILED" 68 except ssl.SSLError as e: 69 assert e.reason == "CERTIFICATE_VERIFY_FAILED" 70 except httplib2.SSLHandshakeError: # Python2 71 pass 72 73 74@pytest.mark.skipif( 75 not hasattr(tests.ssl_context(), "minimum_version"), 76 reason="ssl doesn't support TLS min/max", 77) 78def test_set_min_tls_version(): 79 # Test setting minimum TLS version 80 # We expect failure on Python < 3.7 or OpenSSL < 1.1 81 expect_success = hasattr(ssl.SSLContext(), 'minimum_version') 82 try: 83 http = httplib2.Http(tls_minimum_version="TLSv1_2") 84 http.request(tests.DUMMY_HTTPS_URL) 85 except RuntimeError: 86 assert not expect_success 87 except socket.error: 88 assert expect_success 89 90 91@pytest.mark.skipif( 92 not hasattr(tests.ssl_context(), "maximum_version"), 93 reason="ssl doesn't support TLS min/max", 94) 95def test_set_max_tls_version(): 96 # Test setting maximum TLS version 97 # We expect RuntimeError on Python < 3.7 or OpenSSL < 1.1 98 # We expect socket error otherwise 99 expect_success = hasattr(ssl.SSLContext(), 'maximum_version') 100 try: 101 http = httplib2.Http(tls_maximum_version="TLSv1_2") 102 http.request(tests.DUMMY_HTTPS_URL) 103 except RuntimeError: 104 assert not expect_success 105 except socket.error: 106 assert expect_success 107 108 109@pytest.mark.skipif( 110 not hasattr(tests.ssl_context(), "minimum_version"), 111 reason="ssl doesn't support TLS min/max", 112) 113def test_min_tls_version(): 114 def setup_tls(context, server, skip_errors): 115 skip_errors.append("WRONG_VERSION_NUMBER") 116 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_1) 117 context.load_cert_chain(tests.SERVER_CHAIN) 118 return context.wrap_socket(server, server_side=True) 119 120 http = httplib2.Http(ca_certs=tests.CA_CERTS, tls_minimum_version="TLSv1_2") 121 with tests.server_const_http(tls=setup_tls) as uri: 122 try: 123 http.request(uri) 124 assert False, "expected SSLError" 125 except ssl.SSLError as e: 126 assert e.reason in ("UNSUPPORTED_PROTOCOL", "VERSION_TOO_LOW") 127 128 129@pytest.mark.skipif( 130 not hasattr(tests.ssl_context(), "maximum_version"), 131 reason="ssl doesn't support TLS min/max", 132) 133def test_max_tls_version(): 134 http = httplib2.Http(ca_certs=tests.CA_CERTS, tls_maximum_version="TLSv1") 135 with tests.server_const_http(tls=True) as uri: 136 http.request(uri) 137 _, tls_ver, _ = http.connections.popitem()[1].sock.cipher() 138 assert tls_ver == "TLSv1.0" 139 140 141def test_client_cert_verified(): 142 cert_log = [] 143 144 def setup_tls(context, server, skip_errors): 145 context.load_verify_locations(cafile=tests.CA_CERTS) 146 context.verify_mode = ssl.CERT_REQUIRED 147 return context.wrap_socket(server, server_side=True) 148 149 def handler(request): 150 cert_log.append(request.client_sock.getpeercert()) 151 return tests.http_response_bytes() 152 153 http = httplib2.Http(ca_certs=tests.CA_CERTS) 154 with tests.server_request(handler, tls=setup_tls) as uri: 155 uri_parsed = urllib.parse.urlparse(uri) 156 http.add_certificate(tests.CLIENT_PEM, tests.CLIENT_PEM, uri_parsed.netloc) 157 http.request(uri) 158 159 assert len(cert_log) == 1 160 # TODO extract serial from tests.CLIENT_PEM 161 assert cert_log[0]["serialNumber"] == "E2AA6A96D1BF1AEC" 162 163 164def test_client_cert_password_verified(): 165 cert_log = [] 166 167 def setup_tls(context, server, skip_errors): 168 context.load_verify_locations(cafile=tests.CA_CERTS) 169 context.verify_mode = ssl.CERT_REQUIRED 170 return context.wrap_socket(server, server_side=True) 171 172 def handler(request): 173 cert_log.append(request.client_sock.getpeercert()) 174 return tests.http_response_bytes() 175 176 http = httplib2.Http(ca_certs=tests.CA_CERTS) 177 with tests.server_request(handler, tls=setup_tls) as uri: 178 uri_parsed = urllib.parse.urlparse(uri) 179 http.add_certificate(tests.CLIENT_ENCRYPTED_PEM, tests.CLIENT_ENCRYPTED_PEM, 180 uri_parsed.netloc, password="12345") 181 http.request(uri) 182 183 assert len(cert_log) == 1 184 # TODO extract serial from tests.CLIENT_PEM 185 assert cert_log[0]["serialNumber"] == "E2AA6A96D1BF1AED" 186 187 188@pytest.mark.skipif( 189 not hasattr(tests.ssl_context(), "set_servername_callback"), 190 reason="SSLContext.set_servername_callback is not available", 191) 192def test_sni_set_servername_callback(): 193 sni_log = [] 194 195 def setup_tls(context, server, skip_errors): 196 context.set_servername_callback(lambda _sock, hostname, _context: sni_log.append(hostname)) 197 return context.wrap_socket(server, server_side=True) 198 199 http = httplib2.Http(ca_certs=tests.CA_CERTS) 200 with tests.server_const_http(tls=setup_tls) as uri: 201 uri_parsed = urllib.parse.urlparse(uri) 202 http.request(uri) 203 assert sni_log == [uri_parsed.hostname] 204