1# Copyright 2007,2011 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# 15 16# This file is derived from 17# http://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py 18 19 20"""Extensions to allow HTTPS requests with SSL certificate validation.""" 21 22import re 23import socket 24import ssl 25 26import boto 27 28from boto.compat import six, http_client 29 30 31class InvalidCertificateException(http_client.HTTPException): 32 """Raised when a certificate is provided with an invalid hostname.""" 33 34 def __init__(self, host, cert, reason): 35 """Constructor. 36 37 Args: 38 host: The hostname the connection was made to. 39 cert: The SSL certificate (as a dictionary) the host returned. 40 """ 41 http_client.HTTPException.__init__(self) 42 self.host = host 43 self.cert = cert 44 self.reason = reason 45 46 def __str__(self): 47 return ('Host %s returned an invalid certificate (%s): %s' % 48 (self.host, self.reason, self.cert)) 49 50 51def GetValidHostsForCert(cert): 52 """Returns a list of valid host globs for an SSL certificate. 53 54 Args: 55 cert: A dictionary representing an SSL certificate. 56 Returns: 57 list: A list of valid host globs. 58 """ 59 if 'subjectAltName' in cert: 60 return [x[1] for x in cert['subjectAltName'] if x[0].lower() == 'dns'] 61 else: 62 return [x[0][1] for x in cert['subject'] 63 if x[0][0].lower() == 'commonname'] 64 65 66def ValidateCertificateHostname(cert, hostname): 67 """Validates that a given hostname is valid for an SSL certificate. 68 69 Args: 70 cert: A dictionary representing an SSL certificate. 71 hostname: The hostname to test. 72 Returns: 73 bool: Whether or not the hostname is valid for this certificate. 74 """ 75 hosts = GetValidHostsForCert(cert) 76 boto.log.debug( 77 "validating server certificate: hostname=%s, certificate hosts=%s", 78 hostname, hosts) 79 for host in hosts: 80 host_re = host.replace('.', '\.').replace('*', '[^.]*') 81 if re.search('^%s$' % (host_re,), hostname, re.I): 82 return True 83 return False 84 85 86class CertValidatingHTTPSConnection(http_client.HTTPConnection): 87 """An HTTPConnection that connects over SSL and validates certificates.""" 88 89 default_port = http_client.HTTPS_PORT 90 91 def __init__(self, host, port=default_port, key_file=None, cert_file=None, 92 ca_certs=None, strict=None, **kwargs): 93 """Constructor. 94 95 Args: 96 host: The hostname. Can be in 'host:port' form. 97 port: The port. Defaults to 443. 98 key_file: A file containing the client's private key 99 cert_file: A file containing the client's certificates 100 ca_certs: A file contianing a set of concatenated certificate authority 101 certs for validating the server against. 102 strict: When true, causes BadStatusLine to be raised if the status line 103 can't be parsed as a valid HTTP/1.0 or 1.1 status line. 104 """ 105 if six.PY2: 106 # Python 3.2 and newer have deprecated and removed the strict 107 # parameter. Since the params are supported as keyword arguments 108 # we conditionally add it here. 109 kwargs['strict'] = strict 110 111 http_client.HTTPConnection.__init__(self, host=host, port=port, **kwargs) 112 self.key_file = key_file 113 self.cert_file = cert_file 114 self.ca_certs = ca_certs 115 116 def connect(self): 117 "Connect to a host on a given (SSL) port." 118 if hasattr(self, "timeout"): 119 sock = socket.create_connection((self.host, self.port), self.timeout) 120 else: 121 sock = socket.create_connection((self.host, self.port)) 122 msg = "wrapping ssl socket; " 123 if self.ca_certs: 124 msg += "CA certificate file=%s" % self.ca_certs 125 else: 126 msg += "using system provided SSL certs" 127 boto.log.debug(msg) 128 self.sock = ssl.wrap_socket(sock, keyfile=self.key_file, 129 certfile=self.cert_file, 130 cert_reqs=ssl.CERT_REQUIRED, 131 ca_certs=self.ca_certs) 132 cert = self.sock.getpeercert() 133 hostname = self.host.split(':', 0)[0] 134 if not ValidateCertificateHostname(cert, hostname): 135 raise InvalidCertificateException(hostname, 136 cert, 137 'remote hostname "%s" does not match ' 138 'certificate' % hostname) 139