1#!/usr/bin/env python
2# Copyright 2012 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import mock
17import unittest
18
19import datetime
20import dnsproxy
21import httparchive
22import httpclient
23import platformsettings
24import script_injector
25import test_utils
26
27
28class RealHttpFetchTest(unittest.TestCase):
29
30  # Initialize test data
31  CONTENT_TYPE = 'content-type: image/x-icon'
32  COOKIE_1 = ('Set-Cookie: GMAIL_IMP=EXPIRED; '
33              'Expires=Thu, 12-Jul-2012 22:41:22 GMT; '
34              'Path=/mail; Secure')
35  COOKIE_2 = ('Set-Cookie: GMAIL_STAT_205a=EXPIRED; '
36              'Expires=Thu, 12-Jul-2012 22:42:24 GMT; '
37              'Path=/mail; Secure')
38  FIRST_LINE = 'fake-header: first line'
39  SECOND_LINE = ' second line'
40  THIRD_LINE = '\tthird line'
41  BAD_HEADER = 'this is a bad header'
42
43  def test__GetHeaderNameValueBasic(self):
44    """Test _GetHeaderNameValue with normal header."""
45
46    real_http_fetch = httpclient.RealHttpFetch
47    name_value = real_http_fetch._GetHeaderNameValue(self.CONTENT_TYPE)
48    self.assertEqual(name_value, ('content-type', 'image/x-icon'))
49
50  def test__GetHeaderNameValueLowercasesName(self):
51    """_GetHeaderNameValue lowercases header name."""
52
53    real_http_fetch = httpclient.RealHttpFetch
54    header = 'X-Google-Gfe-Backend-Request-Info: eid=1KMAUMeiK4eMiAL52YyMBg'
55    expected = ('x-google-gfe-backend-request-info',
56                'eid=1KMAUMeiK4eMiAL52YyMBg')
57    name_value = real_http_fetch._GetHeaderNameValue(header)
58    self.assertEqual(name_value, expected)
59
60  def test__GetHeaderNameValueBadLineGivesNone(self):
61    """_GetHeaderNameValue returns None for a header in wrong format."""
62
63    real_http_fetch = httpclient.RealHttpFetch
64    name_value = real_http_fetch._GetHeaderNameValue(self.BAD_HEADER)
65    self.assertIsNone(name_value)
66
67  def test__ToTuplesBasic(self):
68    """Test _ToTuples with normal input."""
69
70    real_http_fetch = httpclient.RealHttpFetch
71    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE]
72    result = real_http_fetch._ToTuples(headers)
73    expected = [('content-type', 'image/x-icon'),
74                ('set-cookie', self.COOKIE_1[12:]),
75                ('fake-header', 'first line')]
76    self.assertEqual(result, expected)
77
78  def test__ToTuplesMultipleHeadersWithSameName(self):
79    """Test mulitple headers with the same name."""
80
81    real_http_fetch = httpclient.RealHttpFetch
82    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.COOKIE_2, self.FIRST_LINE]
83    result = real_http_fetch._ToTuples(headers)
84    expected = [('content-type', 'image/x-icon'),
85                ('set-cookie', self.COOKIE_1[12:]),
86                ('set-cookie', self.COOKIE_2[12:]),
87                ('fake-header', 'first line')]
88    self.assertEqual(result, expected)
89
90  def test__ToTuplesAppendsContinuationLine(self):
91    """Test continuation line is handled."""
92
93    real_http_fetch = httpclient.RealHttpFetch
94    headers = [self.CONTENT_TYPE, self.COOKIE_1, self.FIRST_LINE,
95               self.SECOND_LINE, self.THIRD_LINE]
96    result = real_http_fetch._ToTuples(headers)
97    expected = [('content-type', 'image/x-icon'),
98                ('set-cookie', self.COOKIE_1[12:]),
99                ('fake-header', 'first line\n second line\n third line')]
100    self.assertEqual(result, expected)
101
102  def test__ToTuplesIgnoresBadHeader(self):
103    """Test bad header is ignored."""
104
105    real_http_fetch = httpclient.RealHttpFetch
106    bad_headers = [self.CONTENT_TYPE, self.BAD_HEADER, self.COOKIE_1]
107    expected = [('content-type', 'image/x-icon'),
108                ('set-cookie', self.COOKIE_1[12:])]
109    result = real_http_fetch._ToTuples(bad_headers)
110    self.assertEqual(result, expected)
111
112  def test__ToTuplesIgnoresMisplacedContinuationLine(self):
113    """Test misplaced continuation line is ignored."""
114
115    real_http_fetch = httpclient.RealHttpFetch
116    misplaced_headers = [self.THIRD_LINE, self.CONTENT_TYPE,
117                         self.COOKIE_1, self.FIRST_LINE, self.SECOND_LINE]
118    result = real_http_fetch._ToTuples(misplaced_headers)
119    expected = [('content-type', 'image/x-icon'),
120                ('set-cookie', self.COOKIE_1[12:]),
121                ('fake-header', 'first line\n second line')]
122    self.assertEqual(result, expected)
123
124
125class RealHttpFetchGetConnectionTest(unittest.TestCase):
126  """Test that a connection is made with request IP/port or proxy IP/port."""
127
128  def setUp(self):
129    def real_dns_lookup(host):
130      return {
131          'example.com': '127.127.127.127',
132          'proxy.com': '2.2.2.2',
133          }[host]
134    self.fetch = httpclient.RealHttpFetch(real_dns_lookup)
135    self.https_proxy = None
136    self.http_proxy = None
137    def get_proxy(is_ssl):
138      return self.https_proxy if is_ssl else self.http_proxy
139    self.fetch._get_system_proxy = get_proxy
140
141  def set_http_proxy(self, host, port):
142    self.http_proxy = platformsettings.SystemProxy(host, port)
143
144  def set_https_proxy(self, host, port):
145    self.https_proxy = platformsettings.SystemProxy(host, port)
146
147  def test_get_connection_without_proxy_connects_to_host_ip(self):
148    """HTTP connection with no proxy connects to host IP."""
149    self.set_http_proxy(host=None, port=None)
150    connection = self.fetch._get_connection('example.com', None, is_ssl=False)
151    self.assertEqual('127.127.127.127', connection.host)
152    self.assertEqual(80, connection.port)  # default HTTP port
153
154  def test_get_connection_without_proxy_uses_nondefault_request_port(self):
155    """HTTP connection with no proxy connects with request port."""
156    self.set_https_proxy(host=None, port=None)
157    connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
158    self.assertEqual('127.127.127.127', connection.host)
159    self.assertEqual(8888, connection.port)  # request HTTP port
160
161  def test_get_connection_with_proxy_uses_proxy_port(self):
162    """HTTP connection with proxy connects used proxy port."""
163    self.set_http_proxy(host='proxy.com', port=None)
164    connection = self.fetch._get_connection('example.com', 8888, is_ssl=False)
165    self.assertEqual('2.2.2.2', connection.host)  # proxy IP
166    self.assertEqual(80, connection.port)  # proxy port (default HTTP)
167
168  def test_ssl_get_connection_without_proxy_connects_to_host_ip(self):
169    """HTTPS (SSL) connection with no proxy connects to host IP."""
170    self.set_https_proxy(host=None, port=None)
171    connection = self.fetch._get_connection('example.com', None, is_ssl=True)
172    self.assertEqual('127.127.127.127', connection.host)
173    self.assertEqual(443, connection.port)  # default SSL port
174
175  def test_ssl_get_connection_with_proxy_connects_to_proxy_ip(self):
176    """HTTPS (SSL) connection with proxy connects to proxy IP."""
177    self.set_https_proxy(host='proxy.com', port=8443)
178    connection = self.fetch._get_connection('example.com', None, is_ssl=True)
179    self.assertEqual('2.2.2.2', connection.host)  # proxy IP
180    self.assertEqual(8443, connection.port)  # SSL proxy port
181
182  def test_ssl_get_connection_with_proxy_tunnels_to_host(self):
183    """HTTPS (SSL) connection with proxy tunnels to target host."""
184    self.set_https_proxy(host='proxy.com', port=8443)
185    connection = self.fetch._get_connection('example.com', 9443, is_ssl=True)
186    self.assertEqual('example.com', connection._tunnel_host)  # host name
187    self.assertEqual(9443, connection._tunnel_port)  # host port
188
189
190class ActualNetworkFetchTest(test_utils.RealNetworkFetchTest):
191
192  def testFetchNonSSLRequest(self):
193    real_dns_lookup = dnsproxy.RealDnsLookup(
194        name_servers=[platformsettings.get_original_primary_nameserver()],
195        dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
196    fetch = httpclient.RealHttpFetch(real_dns_lookup)
197    request = httparchive.ArchivedHttpRequest(
198        command='GET', host='google.com', full_path='/search?q=dogs',
199        request_body=None, headers={}, is_ssl=False)
200    response = fetch(request)
201    self.assertIsNotNone(response)
202
203  def testFetchSSLRequest(self):
204    real_dns_lookup = dnsproxy.RealDnsLookup(
205        name_servers=[platformsettings.get_original_primary_nameserver()],
206        dns_forwarding=False, proxy_host='127.0.0.1', proxy_port=5353)
207    fetch = httpclient.RealHttpFetch(real_dns_lookup)
208    request = httparchive.ArchivedHttpRequest(
209        command='GET', host='google.com', full_path='/search?q=dogs',
210        request_body=None, headers={}, is_ssl=True)
211    response = fetch(request)
212    self.assertIsNotNone(response)
213
214
215class HttpArchiveFetchTest(unittest.TestCase):
216
217  TEST_REQUEST_TIME = datetime.datetime(2016, 11, 17, 1, 2, 3, 456)
218
219  def createTestResponse(self):
220    return httparchive.ArchivedHttpResponse(
221        11, 200, 'OK', [('content-type', 'text/html')],
222        ['<body>test</body>'],
223        request_time=HttpArchiveFetchTest.TEST_REQUEST_TIME)
224
225  def checkTestResponse(self, actual_response, archive, request):
226    self.assertEqual(actual_response, archive[request])
227    self.assertEqual(['<body>test</body>'], actual_response.response_data)
228    self.assertEqual(HttpArchiveFetchTest.TEST_REQUEST_TIME,
229                     actual_response.request_time)
230
231  @staticmethod
232  def dummy_injector(_):
233    return '<body>test</body>'
234
235
236class RecordHttpArchiveFetchTest(HttpArchiveFetchTest):
237
238  @mock.patch('httpclient.RealHttpFetch')
239  def testFetch(self, real_http_fetch):
240    http_fetch_instance = real_http_fetch.return_value
241    response = self.createTestResponse()
242    http_fetch_instance.return_value = response
243    archive = httparchive.HttpArchive()
244    fetch = httpclient.RecordHttpArchiveFetch(archive, self.dummy_injector)
245    request = httparchive.ArchivedHttpRequest(
246        'GET', 'www.test.com', '/', None, {})
247    self.checkTestResponse(fetch(request), archive, request)
248
249
250class ReplayHttpArchiveFetchTest(HttpArchiveFetchTest):
251
252  def testFetch(self):
253    request = httparchive.ArchivedHttpRequest(
254        'GET', 'www.test.com', '/', None, {})
255    response = self.createTestResponse()
256    archive = httparchive.HttpArchive()
257    archive[request] = response
258    fetch = httpclient.ReplayHttpArchiveFetch(
259        archive, None, self.dummy_injector)
260    self.checkTestResponse(fetch(request), archive, request)
261
262  @mock.patch('script_injector.util.resource_string')
263  @mock.patch('script_injector.util.resource_exists')
264  @mock.patch('script_injector.os.path.exists')
265  def testInjectedDate(self, os_path, util_exists, util_resource_string):
266    os_path.return_value = False
267    util_exists.return_value = True
268    util_resource_string.return_value = \
269        ["""var time_seed={}""".format(script_injector.TIME_SEED_MARKER)]
270    request = httparchive.ArchivedHttpRequest(
271        'GET', 'www.test.com', '/', None, {})
272    response = self.createTestResponse()
273    archive = httparchive.HttpArchive()
274    archive[request] = response
275
276    fetch = httpclient.ReplayHttpArchiveFetch(
277        archive, None, script_injector.GetScriptInjector("time_script.js"))
278    self.assertEqual(
279        ['<script>var time_seed=1479344523000</script><body>test</body>'],
280        fetch(request).response_data)
281
282
283if __name__ == '__main__':
284  unittest.main()
285