1#!/usr/bin/env python 2# Copyright 2012 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import mock 17import unittest 18 19import datetime 20import dnsproxy 21import httparchive 22import httpclient 23import platformsettings 24import script_injector 25import test_utils 26 27 28class RealHttpFetchTest(unittest.TestCase): 29 30 # Initialize test data 31 CONTENT_TYPE = 'content-type: image/x-icon' 32 COOKIE_1 = ('Set-Cookie: GMAIL_IMP=EXPIRED; ' 33 'Expires=Thu, 12-Jul-2012 22:41:22 GMT; ' 34 'Path=/mail; Secure') 35 COOKIE_2 = ('Set-Cookie: GMAIL_STAT_205a=EXPIRED; ' 36 'Expires=Thu, 12-Jul-2012 22:42:24 GMT; ' 37 'Path=/mail; Secure') 38 FIRST_LINE = 'fake-header: first line' 39 SECOND_LINE = ' second line' 40 THIRD_LINE = '\tthird line' 41 BAD_HEADER = 'this is a bad header' 42 43 def test__GetHeaderNameValueBasic(self): 44 """Test _GetHeaderNameValue with normal header.""" 45 46 real_http_fetch = httpclient.RealHttpFetch 47 name_value = real_http_fetch._GetHeaderNameValue(self.CONTENT_TYPE) 48 self.assertEqual(name_value, ('content-type', 'image/x-icon')) 49 50 def test__GetHeaderNameValueLowercasesName(self): 51 """_GetHeaderNameValue lowercases header name.""" 52 53 real_http_fetch = httpclient.RealHttpFetch 54 header = 'X-Google-Gfe-Backend-Request-Info: eid=1KMAUMeiK4eMiAL52YyMBg' 55 expected = ('x-google-gfe-backend-request-info', 56 'eid=1KMAUMeiK4eMiAL52YyMBg') 57 name_value = real_http_fetch._GetHeaderNameValue(header) 58 self.assertEqual(name_value, expected) 59 60 def test__GetHeaderNameValueBadLineGivesNone(self): 61 """_GetHeaderNameValue returns None for a header in wrong format.""" 62 63 real_http_fetch = httpclient.RealHttpFetch 64 name_value = real_http_fetch._GetHeaderNameValue(self.BAD_HEADER) 65 self.assertIsNone(name_value) 66 67 def test__ToTuplesBasic(self): 68 """Test _ToTuples with normal input.""" 69 70 real_http_fetch = httpclient.RealHttpFetch 71 headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE] 72 result = real_http_fetch._ToTuples(headers) 73 expected = [('content-type', 'image/x-icon'), 74 ('set-cookie', self.COOKIE_1[12:]), 75 ('fake-header', 'first line')] 76 self.assertEqual(result, expected) 77 78 def test__ToTuplesMultipleHeadersWithSameName(self): 79 """Test mulitple headers with the same name.""" 80 81 real_http_fetch = httpclient.RealHttpFetch 82 headers = [self.CONTENT_TYPE, self.COOKIE_1, self.COOKIE_2, self.FIRST_LINE] 83 result = real_http_fetch._ToTuples(headers) 84 expected = [('content-type', 'image/x-icon'), 85 ('set-cookie', self.COOKIE_1[12:]), 86 ('set-cookie', self.COOKIE_2[12:]), 87 ('fake-header', 'first line')] 88 self.assertEqual(result, expected) 89 90 def test__ToTuplesAppendsContinuationLine(self): 91 """Test continuation line is handled.""" 92 93 real_http_fetch = httpclient.RealHttpFetch 94 headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE, 95 self.SECOND_LINE, self.THIRD_LINE] 96 result = real_http_fetch._ToTuples(headers) 97 expected = [('content-type', 'image/x-icon'), 98 ('set-cookie', self.COOKIE_1[12:]), 99 ('fake-header', 'first line\n second line\n third line')] 100 self.assertEqual(result, expected) 101 102 def test__ToTuplesIgnoresBadHeader(self): 103 """Test bad header is ignored.""" 104 105 real_http_fetch = httpclient.RealHttpFetch 106 bad_headers = [self.CONTENT_TYPE, self.BAD_HEADER, self.COOKIE_1] 107 expected = [('content-type', 'image/x-icon'), 108 ('set-cookie', self.COOKIE_1[12:])] 109 result = real_http_fetch._ToTuples(bad_headers) 110 self.assertEqual(result, expected) 111 112 def test__ToTuplesIgnoresMisplacedContinuationLine(self): 113 """Test misplaced continuation line is ignored.""" 114 115 real_http_fetch = httpclient.RealHttpFetch 116 misplaced_headers = [self.THIRD_LINE, self.CONTENT_TYPE, 117 self.COOKIE_1, self.FIRST_LINE, self.SECOND_LINE] 118 result = real_http_fetch._ToTuples(misplaced_headers) 119 expected = [('content-type', 'image/x-icon'), 120 ('set-cookie', self.COOKIE_1[12:]), 121 ('fake-header', 'first line\n second line')] 122 self.assertEqual(result, expected) 123 124 125class RealHttpFetchGetConnectionTest(unittest.TestCase): 126 """Test that a connection is made with request IP/port or proxy IP/port.""" 127 128 def setUp(self): 129 def real_dns_lookup(host): 130 return { 131 'example.com': '127.127.127.127', 132 'proxy.com': '2.2.2.2', 133 }[host] 134 self.fetch = httpclient.RealHttpFetch(real_dns_lookup) 135 self.https_proxy = None 136 self.http_proxy = None 137 def get_proxy(is_ssl): 138 return self.https_proxy if is_ssl else self.http_proxy 139 self.fetch._get_system_proxy = get_proxy 140 141 def set_http_proxy(self, host, port): 142 self.http_proxy = platformsettings.SystemProxy(host, port) 143 144 def set_https_proxy(self, host, port): 145 self.https_proxy = platformsettings.SystemProxy(host, port) 146 147 def test_get_connection_without_proxy_connects_to_host_ip(self): 148 """HTTP connection with no proxy connects to host IP.""" 149 self.set_http_proxy(host=None, port=None) 150 connection = self.fetch._get_connection('example.com', None, is_ssl=False) 151 self.assertEqual('127.127.127.127', connection.host) 152 self.assertEqual(80, connection.port) # default HTTP port 153 154 def test_get_connection_without_proxy_uses_nondefault_request_port(self): 155 """HTTP connection with no proxy connects with request port.""" 156 self.set_https_proxy(host=None, port=None) 157 connection = self.fetch._get_connection('example.com', 8888, is_ssl=False) 158 self.assertEqual('127.127.127.127', connection.host) 159 self.assertEqual(8888, connection.port) # request HTTP port 160 161 def test_get_connection_with_proxy_uses_proxy_port(self): 162 """HTTP connection with proxy connects used proxy port.""" 163 self.set_http_proxy(host='proxy.com', port=None) 164 connection = self.fetch._get_connection('example.com', 8888, is_ssl=False) 165 self.assertEqual('2.2.2.2', connection.host) # proxy IP 166 self.assertEqual(80, connection.port) # proxy port (default HTTP) 167 168 def test_ssl_get_connection_without_proxy_connects_to_host_ip(self): 169 """HTTPS (SSL) connection with no proxy connects to host IP.""" 170 self.set_https_proxy(host=None, port=None) 171 connection = self.fetch._get_connection('example.com', None, is_ssl=True) 172 self.assertEqual('127.127.127.127', connection.host) 173 self.assertEqual(443, connection.port) # default SSL port 174 175 def test_ssl_get_connection_with_proxy_connects_to_proxy_ip(self): 176 """HTTPS (SSL) connection with proxy connects to proxy IP.""" 177 self.set_https_proxy(host='proxy.com', port=8443) 178 connection = self.fetch._get_connection('example.com', None, is_ssl=True) 179 self.assertEqual('2.2.2.2', connection.host) # proxy IP 180 self.assertEqual(8443, connection.port) # SSL proxy port 181 182 def test_ssl_get_connection_with_proxy_tunnels_to_host(self): 183 """HTTPS (SSL) connection with proxy tunnels to target host.""" 184 self.set_https_proxy(host='proxy.com', port=8443) 185 connection = self.fetch._get_connection('example.com', 9443, is_ssl=True) 186 self.assertEqual('example.com', connection._tunnel_host) # host name 187 self.assertEqual(9443, connection._tunnel_port) # host port 188 189 190class ActualNetworkFetchTest(test_utils.RealNetworkFetchTest): 191 192 def testFetchNonSSLRequest(self): 193 real_dns_lookup = dnsproxy.RealDnsLookup( 194 name_servers=[platformsettings.get_original_primary_nameserver()], 195 dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353) 196 fetch = httpclient.RealHttpFetch(real_dns_lookup) 197 request = httparchive.ArchivedHttpRequest( 198 command='GET', host='google.com', full_path='/search?q=dogs', 199 request_body=None, headers={}, is_ssl=False) 200 response = fetch(request) 201 self.assertIsNotNone(response) 202 203 def testFetchSSLRequest(self): 204 real_dns_lookup = dnsproxy.RealDnsLookup( 205 name_servers=[platformsettings.get_original_primary_nameserver()], 206 dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353) 207 fetch = httpclient.RealHttpFetch(real_dns_lookup) 208 request = httparchive.ArchivedHttpRequest( 209 command='GET', host='google.com', full_path='/search?q=dogs', 210 request_body=None, headers={}, is_ssl=True) 211 response = fetch(request) 212 self.assertIsNotNone(response) 213 214 215class HttpArchiveFetchTest(unittest.TestCase): 216 217 TEST_REQUEST_TIME = datetime.datetime(2016, 11, 17, 1, 2, 3, 456) 218 219 def createTestResponse(self): 220 return httparchive.ArchivedHttpResponse( 221 11, 200, 'OK', [('content-type', 'text/html')], 222 ['<body>test</body>'], 223 request_time=HttpArchiveFetchTest.TEST_REQUEST_TIME) 224 225 def checkTestResponse(self, actual_response, archive, request): 226 self.assertEqual(actual_response, archive[request]) 227 self.assertEqual(['<body>test</body>'], actual_response.response_data) 228 self.assertEqual(HttpArchiveFetchTest.TEST_REQUEST_TIME, 229 actual_response.request_time) 230 231 @staticmethod 232 def dummy_injector(_): 233 return '<body>test</body>' 234 235 236class RecordHttpArchiveFetchTest(HttpArchiveFetchTest): 237 238 @mock.patch('httpclient.RealHttpFetch') 239 def testFetch(self, real_http_fetch): 240 http_fetch_instance = real_http_fetch.return_value 241 response = self.createTestResponse() 242 http_fetch_instance.return_value = response 243 archive = httparchive.HttpArchive() 244 fetch = httpclient.RecordHttpArchiveFetch(archive, self.dummy_injector) 245 request = httparchive.ArchivedHttpRequest( 246 'GET', 'www.test.com', '/', None, {}) 247 self.checkTestResponse(fetch(request), archive, request) 248 249 250class ReplayHttpArchiveFetchTest(HttpArchiveFetchTest): 251 252 def testFetch(self): 253 request = httparchive.ArchivedHttpRequest( 254 'GET', 'www.test.com', '/', None, {}) 255 response = self.createTestResponse() 256 archive = httparchive.HttpArchive() 257 archive[request] = response 258 fetch = httpclient.ReplayHttpArchiveFetch( 259 archive, None, self.dummy_injector) 260 self.checkTestResponse(fetch(request), archive, request) 261 262 @mock.patch('script_injector.util.resource_string') 263 @mock.patch('script_injector.util.resource_exists') 264 @mock.patch('script_injector.os.path.exists') 265 def testInjectedDate(self, os_path, util_exists, util_resource_string): 266 os_path.return_value = False 267 util_exists.return_value = True 268 util_resource_string.return_value = \ 269 ["""var time_seed={}""".format(script_injector.TIME_SEED_MARKER)] 270 request = httparchive.ArchivedHttpRequest( 271 'GET', 'www.test.com', '/', None, {}) 272 response = self.createTestResponse() 273 archive = httparchive.HttpArchive() 274 archive[request] = response 275 276 fetch = httpclient.ReplayHttpArchiveFetch( 277 archive, None, script_injector.GetScriptInjector("time_script.js")) 278 self.assertEqual( 279 ['<script>var time_seed=1479344523000</script><body>test</body>'], 280 fetch(request).response_data) 281 282 283if __name__ == '__main__': 284 unittest.main() 285