1#!/usr/bin/env python 2# Copyright 2011 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import calendar 17import email.utils 18import httparchive 19import unittest 20 21 22def create_request(headers): 23 return httparchive.ArchivedHttpRequest( 24 'GET', 'www.test.com', '/', None, headers) 25 26def create_response(headers): 27 return httparchive.ArchivedHttpResponse( 28 11, 200, 'OK', headers, '') 29 30 31class HttpArchiveTest(unittest.TestCase): 32 33 REQUEST_HEADERS = {} 34 REQUEST = create_request(REQUEST_HEADERS) 35 36 # Used for if-(un)modified-since checks 37 DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT' 38 DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT' 39 DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT' 40 DATE_INVALID = 'This is an invalid date!!' 41 42 # etag values 43 ETAG_VALID = 'etag' 44 ETAG_INVALID = 'This is an invalid etag value!!' 45 46 RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)] 47 RESPONSE = create_response(RESPONSE_HEADERS) 48 49 def setUp(self): 50 self.archive = httparchive.HttpArchive() 51 self.archive[self.REQUEST] = self.RESPONSE 52 53 # Also add an identical POST request for testing 54 request = httparchive.ArchivedHttpRequest( 55 'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS) 56 self.archive[request] = self.RESPONSE 57 58 def tearDown(self): 59 pass 60 61 def test_init(self): 62 archive = httparchive.HttpArchive() 63 self.assertEqual(len(archive), 0) 64 65 def test__TrimHeaders(self): 66 request = httparchive.ArchivedHttpRequest 67 header1 = {'accept-encoding': 'gzip,deflate'} 68 self.assertEqual(request._TrimHeaders(header1), 69 [(k, v) for k, v in header1.items()]) 70 71 header2 = {'referer': 'www.google.com'} 72 self.assertEqual(request._TrimHeaders(header2), []) 73 74 header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!', 75 'hello': 'world'} 76 self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')]) 77 78 # Tests that spaces and trailing comma get stripped. 79 header4 = {'accept-encoding': 'gzip, deflate,, '} 80 self.assertEqual(request._TrimHeaders(header4), 81 [('accept-encoding', 'gzip,deflate')]) 82 83 # Tests that 'lzma' gets stripped. 84 header5 = {'accept-encoding': 'gzip, deflate, lzma'} 85 self.assertEqual(request._TrimHeaders(header5), 86 [('accept-encoding', 'gzip,deflate')]) 87 88 # Tests that x-client-data gets stripped. 89 header6 = {'x-client-data': 'testdata'} 90 self.assertEqual(request._TrimHeaders(header6), []) 91 92 def test_matches(self): 93 headers = {} 94 request1 = httparchive.ArchivedHttpRequest( 95 'GET', 'www.test.com', '/index.html?hello=world', None, headers) 96 request2 = httparchive.ArchivedHttpRequest( 97 'GET', 'www.test.com', '/index.html?foo=bar', None, headers) 98 99 self.assert_(not request1.matches( 100 request2.command, request2.host, request2.full_path, use_query=True)) 101 self.assert_(request1.matches( 102 request2.command, request2.host, request2.full_path, use_query=False)) 103 104 self.assert_(request1.matches( 105 request2.command, request2.host, None, use_query=True)) 106 self.assert_(request1.matches( 107 request2.command, None, request2.full_path, use_query=False)) 108 109 empty_request = httparchive.ArchivedHttpRequest( 110 None, None, None, None, headers) 111 self.assert_(not empty_request.matches( 112 request2.command, request2.host, None, use_query=True)) 113 self.assert_(not empty_request.matches( 114 request2.command, None, request2.full_path, use_query=False)) 115 116 def setup_find_closest_request(self): 117 headers = {} 118 request1 = httparchive.ArchivedHttpRequest( 119 'GET', 'www.test.com', '/a?hello=world', None, headers) 120 request2 = httparchive.ArchivedHttpRequest( 121 'GET', 'www.test.com', '/a?foo=bar', None, headers) 122 request3 = httparchive.ArchivedHttpRequest( 123 'GET', 'www.test.com', '/b?hello=world', None, headers) 124 request4 = httparchive.ArchivedHttpRequest( 125 'GET', 'www.test.com', '/c?hello=world', None, headers) 126 127 archive = httparchive.HttpArchive() 128 # Add requests 2 and 3 and find closest match with request1 129 archive[request2] = self.RESPONSE 130 archive[request3] = self.RESPONSE 131 132 return archive, request1, request2, request3, request4 133 134 def test_find_closest_request(self): 135 archive, request1, request2, request3, request4 = ( 136 self.setup_find_closest_request()) 137 138 # Always favor requests with same paths, even if use_path=False. 139 self.assertEqual( 140 request2, archive.find_closest_request(request1, use_path=False)) 141 # If we match strictly on path, request2 is the only match 142 self.assertEqual( 143 request2, archive.find_closest_request(request1, use_path=True)) 144 # request4 can be matched with request3, if use_path=False 145 self.assertEqual( 146 request3, archive.find_closest_request(request4, use_path=False)) 147 # ...but None, if use_path=True 148 self.assertEqual( 149 None, archive.find_closest_request(request4, use_path=True)) 150 151 def test_find_closest_request_delete_simple(self): 152 archive, request1, request2, request3, request4 = ( 153 self.setup_find_closest_request()) 154 155 del archive[request3] 156 self.assertEqual( 157 request2, archive.find_closest_request(request1, use_path=False)) 158 self.assertEqual( 159 request2, archive.find_closest_request(request1, use_path=True)) 160 161 def test_find_closest_request_delete_complex(self): 162 archive, request1, request2, request3, request4 = ( 163 self.setup_find_closest_request()) 164 165 del archive[request2] 166 self.assertEqual( 167 request3, archive.find_closest_request(request1, use_path=False)) 168 self.assertEqual( 169 None, archive.find_closest_request(request1, use_path=True)) 170 171 def test_find_closest_request_timestamp(self): 172 headers = {} 173 request1 = httparchive.ArchivedHttpRequest( 174 'GET', 'www.test.com', '/index.html?time=100000000&important=true', 175 None, headers) 176 request2 = httparchive.ArchivedHttpRequest( 177 'GET', 'www.test.com', '/index.html?time=99999999&important=true', 178 None, headers) 179 request3 = httparchive.ArchivedHttpRequest( 180 'GET', 'www.test.com', '/index.html?time=10000000&important=false', 181 None, headers) 182 archive = httparchive.HttpArchive() 183 # Add requests 2 and 3 and find closest match with request1 184 archive[request2] = self.RESPONSE 185 archive[request3] = self.RESPONSE 186 187 # Although request3 is lexicographically closer, request2 is semantically 188 # more similar. 189 self.assertEqual( 190 request2, archive.find_closest_request(request1, use_path=True)) 191 192 def test_get_cmp_seq(self): 193 # The order of key-value pairs in query and header respectively should not 194 # matter. 195 headers = {'k2': 'v2', 'k1': 'v1'} 196 request = httparchive.ArchivedHttpRequest( 197 'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers) 198 self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'), 199 ('k1', 'v1'), ('k2', 'v2')], 200 request._GetCmpSeq('c=d&a=b;e=f')) 201 202 def test_get_simple(self): 203 request = self.REQUEST 204 response = self.RESPONSE 205 archive = self.archive 206 207 self.assertEqual(archive.get(request), response) 208 209 false_request_headers = {'foo': 'bar'} 210 false_request = create_request(false_request_headers) 211 self.assertEqual(archive.get(false_request, default=None), None) 212 213 def test_get_modified_headers(self): 214 request = self.REQUEST 215 response = self.RESPONSE 216 archive = self.archive 217 not_modified_response = httparchive.create_response(304) 218 219 # Fail check and return response again 220 request_headers = {'if-modified-since': self.DATE_PAST} 221 request = create_request(request_headers) 222 self.assertEqual(archive.get(request), response) 223 224 # Succeed check and return 304 Not Modified 225 request_headers = {'if-modified-since': self.DATE_FUTURE} 226 request = create_request(request_headers) 227 self.assertEqual(archive.get(request), not_modified_response) 228 229 # Succeed check and return 304 Not Modified 230 request_headers = {'if-modified-since': self.DATE_PRESENT} 231 request = create_request(request_headers) 232 self.assertEqual(archive.get(request), not_modified_response) 233 234 # Invalid date, fail check and return response again 235 request_headers = {'if-modified-since': self.DATE_INVALID} 236 request = create_request(request_headers) 237 self.assertEqual(archive.get(request), response) 238 239 # fail check since the request is not a GET or HEAD request (as per RFC) 240 request_headers = {'if-modified-since': self.DATE_FUTURE} 241 request = httparchive.ArchivedHttpRequest( 242 'POST', 'www.test.com', '/', None, request_headers) 243 self.assertEqual(archive.get(request), response) 244 245 def test_get_unmodified_headers(self): 246 request = self.REQUEST 247 response = self.RESPONSE 248 archive = self.archive 249 not_modified_response = httparchive.create_response(304) 250 251 # Succeed check 252 request_headers = {'if-unmodified-since': self.DATE_PAST} 253 request = create_request(request_headers) 254 self.assertEqual(archive.get(request), not_modified_response) 255 256 # Fail check 257 request_headers = {'if-unmodified-since': self.DATE_FUTURE} 258 request = create_request(request_headers) 259 self.assertEqual(archive.get(request), response) 260 261 # Succeed check 262 request_headers = {'if-unmodified-since': self.DATE_PRESENT} 263 request = create_request(request_headers) 264 self.assertEqual(archive.get(request), not_modified_response) 265 266 # Fail check 267 request_headers = {'if-unmodified-since': self.DATE_INVALID} 268 request = create_request(request_headers) 269 self.assertEqual(archive.get(request), response) 270 271 # Fail check since the request is not a GET or HEAD request (as per RFC) 272 request_headers = {'if-modified-since': self.DATE_PAST} 273 request = httparchive.ArchivedHttpRequest( 274 'POST', 'www.test.com', '/', None, request_headers) 275 self.assertEqual(archive.get(request), response) 276 277 def test_get_etags(self): 278 request = self.REQUEST 279 response = self.RESPONSE 280 archive = self.archive 281 not_modified_response = httparchive.create_response(304) 282 precondition_failed_response = httparchive.create_response(412) 283 284 # if-match headers 285 request_headers = {'if-match': self.ETAG_VALID} 286 request = create_request(request_headers) 287 self.assertEqual(archive.get(request), response) 288 289 request_headers = {'if-match': self.ETAG_INVALID} 290 request = create_request(request_headers) 291 self.assertEqual(archive.get(request), precondition_failed_response) 292 293 # if-none-match headers 294 request_headers = {'if-none-match': self.ETAG_VALID} 295 request = create_request(request_headers) 296 self.assertEqual(archive.get(request), not_modified_response) 297 298 request_headers = {'if-none-match': self.ETAG_INVALID} 299 request = create_request(request_headers) 300 self.assertEqual(archive.get(request), response) 301 302 def test_get_multiple_match_headers(self): 303 request = self.REQUEST 304 response = self.RESPONSE 305 archive = self.archive 306 not_modified_response = httparchive.create_response(304) 307 precondition_failed_response = httparchive.create_response(412) 308 309 # if-match headers 310 # If the request would, without the If-Match header field, 311 # result in anything other than a 2xx or 412 status, 312 # then the If-Match header MUST be ignored. 313 314 request_headers = { 315 'if-match': self.ETAG_VALID, 316 'if-modified-since': self.DATE_PAST, 317 } 318 request = create_request(request_headers) 319 self.assertEqual(archive.get(request), response) 320 321 # Invalid etag, precondition failed 322 request_headers = { 323 'if-match': self.ETAG_INVALID, 324 'if-modified-since': self.DATE_PAST, 325 } 326 request = create_request(request_headers) 327 self.assertEqual(archive.get(request), precondition_failed_response) 328 329 # 304 response; ignore if-match header 330 request_headers = { 331 'if-match': self.ETAG_VALID, 332 'if-modified-since': self.DATE_FUTURE, 333 } 334 request = create_request(request_headers) 335 self.assertEqual(archive.get(request), not_modified_response) 336 337 # 304 response; ignore if-match header 338 request_headers = { 339 'if-match': self.ETAG_INVALID, 340 'if-modified-since': self.DATE_PRESENT, 341 } 342 request = create_request(request_headers) 343 self.assertEqual(archive.get(request), not_modified_response) 344 345 # Invalid etag, precondition failed 346 request_headers = { 347 'if-match': self.ETAG_INVALID, 348 'if-modified-since': self.DATE_INVALID, 349 } 350 request = create_request(request_headers) 351 self.assertEqual(archive.get(request), precondition_failed_response) 352 353 def test_get_multiple_none_match_headers(self): 354 request = self.REQUEST 355 response = self.RESPONSE 356 archive = self.archive 357 not_modified_response = httparchive.create_response(304) 358 precondition_failed_response = httparchive.create_response(412) 359 360 # if-none-match headers 361 # If the request would, without the If-None-Match header field, 362 # result in anything other than a 2xx or 304 status, 363 # then the If-None-Match header MUST be ignored. 364 365 request_headers = { 366 'if-none-match': self.ETAG_VALID, 367 'if-modified-since': self.DATE_PAST, 368 } 369 request = create_request(request_headers) 370 self.assertEqual(archive.get(request), response) 371 372 request_headers = { 373 'if-none-match': self.ETAG_INVALID, 374 'if-modified-since': self.DATE_PAST, 375 } 376 request = create_request(request_headers) 377 self.assertEqual(archive.get(request), response) 378 379 # etag match, precondition failed 380 request_headers = { 381 'if-none-match': self.ETAG_VALID, 382 'if-modified-since': self.DATE_FUTURE, 383 } 384 request = create_request(request_headers) 385 self.assertEqual(archive.get(request), not_modified_response) 386 387 request_headers = { 388 'if-none-match': self.ETAG_INVALID, 389 'if-modified-since': self.DATE_PRESENT, 390 } 391 request = create_request(request_headers) 392 self.assertEqual(archive.get(request), not_modified_response) 393 394 request_headers = { 395 'if-none-match': self.ETAG_INVALID, 396 'if-modified-since': self.DATE_INVALID, 397 } 398 request = create_request(request_headers) 399 self.assertEqual(archive.get(request), response) 400 401 402class ArchivedHttpResponse(unittest.TestCase): 403 PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT' 404 PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT' # PAST_DATE_A -1 hour 405 PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT' # PAST_DATE_A +1 hour 406 NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT' 407 NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT' # NOW_DATE_A -1 hour 408 NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT' # NOW_DATE_A +1 hour 409 NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A)) 410 411 def setUp(self): 412 self.response = create_response([('date', self.PAST_DATE_A)]) 413 414 def test_update_date_same_date(self): 415 self.assertEqual( 416 self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS), 417 self.NOW_DATE_A) 418 419 def test_update_date_before_date(self): 420 self.assertEqual( 421 self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), 422 self.NOW_DATE_B) 423 424 def test_update_date_after_date(self): 425 self.assertEqual( 426 self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS), 427 self.NOW_DATE_C) 428 429 def test_update_date_bad_date_param(self): 430 self.assertEqual( 431 self.response.update_date('garbage date', now=self.NOW_SECONDS), 432 'garbage date') 433 434 def test_update_date_bad_date_header(self): 435 self.response.set_header('date', 'garbage date') 436 self.assertEqual( 437 self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS), 438 self.PAST_DATE_B) 439 440 441if __name__ == '__main__': 442 unittest.main() 443