1#!/usr/bin/env python
2# Copyright 2011 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import calendar
17import email.utils
18import httparchive
19import unittest
20
21
22def create_request(headers):
23  return httparchive.ArchivedHttpRequest(
24      'GET', 'www.test.com', '/', None, headers)
25
26def create_response(headers):
27  return httparchive.ArchivedHttpResponse(
28      11, 200, 'OK', headers, '')
29
30
31class HttpArchiveTest(unittest.TestCase):
32
33  REQUEST_HEADERS = {}
34  REQUEST = create_request(REQUEST_HEADERS)
35
36  # Used for if-(un)modified-since checks
37  DATE_PAST = 'Wed, 13 Jul 2011 03:58:08 GMT'
38  DATE_PRESENT = 'Wed, 20 Jul 2011 04:58:08 GMT'
39  DATE_FUTURE = 'Wed, 27 Jul 2011 05:58:08 GMT'
40  DATE_INVALID = 'This is an invalid date!!'
41
42  # etag values
43  ETAG_VALID = 'etag'
44  ETAG_INVALID = 'This is an invalid etag value!!'
45
46  RESPONSE_HEADERS = [('last-modified', DATE_PRESENT), ('etag', ETAG_VALID)]
47  RESPONSE = create_response(RESPONSE_HEADERS)
48
49  def setUp(self):
50    self.archive = httparchive.HttpArchive()
51    self.archive[self.REQUEST] = self.RESPONSE
52
53    # Also add an identical POST request for testing
54    request = httparchive.ArchivedHttpRequest(
55        'POST', 'www.test.com', '/', None, self.REQUEST_HEADERS)
56    self.archive[request] = self.RESPONSE
57
58  def tearDown(self):
59    pass
60
61  def test_init(self):
62    archive = httparchive.HttpArchive()
63    self.assertEqual(len(archive), 0)
64
65  def test__TrimHeaders(self):
66    request = httparchive.ArchivedHttpRequest
67    header1 = {'accept-encoding': 'gzip,deflate'}
68    self.assertEqual(request._TrimHeaders(header1),
69                     [(k, v) for k, v in header1.items()])
70
71    header2 = {'referer': 'www.google.com'}
72    self.assertEqual(request._TrimHeaders(header2), [])
73
74    header3 = {'referer': 'www.google.com', 'cookie': 'cookie_monster!',
75               'hello': 'world'}
76    self.assertEqual(request._TrimHeaders(header3), [('hello', 'world')])
77
78    # Tests that spaces and trailing comma get stripped.
79    header4 = {'accept-encoding': 'gzip, deflate,, '}
80    self.assertEqual(request._TrimHeaders(header4),
81                     [('accept-encoding', 'gzip,deflate')])
82
83    # Tests that 'lzma' gets stripped.
84    header5 = {'accept-encoding': 'gzip, deflate, lzma'}
85    self.assertEqual(request._TrimHeaders(header5),
86                     [('accept-encoding', 'gzip,deflate')])
87
88    # Tests that x-client-data gets stripped.
89    header6 = {'x-client-data': 'testdata'}
90    self.assertEqual(request._TrimHeaders(header6), [])
91
92  def test_matches(self):
93    headers = {}
94    request1 = httparchive.ArchivedHttpRequest(
95        'GET', 'www.test.com', '/index.html?hello=world', None, headers)
96    request2 = httparchive.ArchivedHttpRequest(
97        'GET', 'www.test.com', '/index.html?foo=bar', None, headers)
98
99    self.assert_(not request1.matches(
100        request2.command, request2.host, request2.full_path, use_query=True))
101    self.assert_(request1.matches(
102        request2.command, request2.host, request2.full_path, use_query=False))
103
104    self.assert_(request1.matches(
105        request2.command, request2.host, None, use_query=True))
106    self.assert_(request1.matches(
107        request2.command, None, request2.full_path, use_query=False))
108
109    empty_request = httparchive.ArchivedHttpRequest(
110        None, None, None, None, headers)
111    self.assert_(not empty_request.matches(
112        request2.command, request2.host, None, use_query=True))
113    self.assert_(not empty_request.matches(
114        request2.command, None, request2.full_path, use_query=False))
115
116  def setup_find_closest_request(self):
117    headers = {}
118    request1 = httparchive.ArchivedHttpRequest(
119        'GET', 'www.test.com', '/a?hello=world', None, headers)
120    request2 = httparchive.ArchivedHttpRequest(
121        'GET', 'www.test.com', '/a?foo=bar', None, headers)
122    request3 = httparchive.ArchivedHttpRequest(
123        'GET', 'www.test.com', '/b?hello=world', None, headers)
124    request4 = httparchive.ArchivedHttpRequest(
125        'GET', 'www.test.com', '/c?hello=world', None, headers)
126
127    archive = httparchive.HttpArchive()
128    # Add requests 2 and 3 and find closest match with request1
129    archive[request2] = self.RESPONSE
130    archive[request3] = self.RESPONSE
131
132    return archive, request1, request2, request3, request4
133
134  def test_find_closest_request(self):
135    archive, request1, request2, request3, request4 = (
136      self.setup_find_closest_request())
137
138    # Always favor requests with same paths, even if use_path=False.
139    self.assertEqual(
140        request2, archive.find_closest_request(request1, use_path=False))
141    # If we match strictly on path, request2 is the only match
142    self.assertEqual(
143        request2, archive.find_closest_request(request1, use_path=True))
144    # request4 can be matched with request3, if use_path=False
145    self.assertEqual(
146        request3, archive.find_closest_request(request4, use_path=False))
147    # ...but None, if use_path=True
148    self.assertEqual(
149        None, archive.find_closest_request(request4, use_path=True))
150
151  def test_find_closest_request_delete_simple(self):
152    archive, request1, request2, request3, request4 = (
153      self.setup_find_closest_request())
154
155    del archive[request3]
156    self.assertEqual(
157        request2, archive.find_closest_request(request1, use_path=False))
158    self.assertEqual(
159        request2, archive.find_closest_request(request1, use_path=True))
160
161  def test_find_closest_request_delete_complex(self):
162    archive, request1, request2, request3, request4 = (
163      self.setup_find_closest_request())
164
165    del archive[request2]
166    self.assertEqual(
167        request3, archive.find_closest_request(request1, use_path=False))
168    self.assertEqual(
169        None, archive.find_closest_request(request1, use_path=True))
170
171  def test_find_closest_request_timestamp(self):
172    headers = {}
173    request1 = httparchive.ArchivedHttpRequest(
174        'GET', 'www.test.com', '/index.html?time=100000000&important=true',
175        None, headers)
176    request2 = httparchive.ArchivedHttpRequest(
177        'GET', 'www.test.com', '/index.html?time=99999999&important=true',
178        None, headers)
179    request3 = httparchive.ArchivedHttpRequest(
180        'GET', 'www.test.com', '/index.html?time=10000000&important=false',
181        None, headers)
182    archive = httparchive.HttpArchive()
183    # Add requests 2 and 3 and find closest match with request1
184    archive[request2] = self.RESPONSE
185    archive[request3] = self.RESPONSE
186
187    # Although request3 is lexicographically closer, request2 is semantically
188    # more similar.
189    self.assertEqual(
190        request2, archive.find_closest_request(request1, use_path=True))
191
192  def test_get_cmp_seq(self):
193    # The order of key-value pairs in query and header respectively should not
194    # matter.
195    headers = {'k2': 'v2', 'k1': 'v1'}
196    request = httparchive.ArchivedHttpRequest(
197        'GET', 'www.test.com', '/a?c=d&a=b;e=f', None, headers)
198    self.assertEqual([('a', 'b'), ('c', 'd'), ('e', 'f'),
199                      ('k1', 'v1'), ('k2', 'v2')],
200                     request._GetCmpSeq('c=d&a=b;e=f'))
201
202  def test_get_simple(self):
203    request = self.REQUEST
204    response = self.RESPONSE
205    archive = self.archive
206
207    self.assertEqual(archive.get(request), response)
208
209    false_request_headers = {'foo': 'bar'}
210    false_request = create_request(false_request_headers)
211    self.assertEqual(archive.get(false_request, default=None), None)
212
213  def test_get_modified_headers(self):
214    request = self.REQUEST
215    response = self.RESPONSE
216    archive = self.archive
217    not_modified_response = httparchive.create_response(304)
218
219    # Fail check and return response again
220    request_headers = {'if-modified-since': self.DATE_PAST}
221    request = create_request(request_headers)
222    self.assertEqual(archive.get(request), response)
223
224    # Succeed check and return 304 Not Modified
225    request_headers = {'if-modified-since': self.DATE_FUTURE}
226    request = create_request(request_headers)
227    self.assertEqual(archive.get(request), not_modified_response)
228
229    # Succeed check and return 304 Not Modified
230    request_headers = {'if-modified-since': self.DATE_PRESENT}
231    request = create_request(request_headers)
232    self.assertEqual(archive.get(request), not_modified_response)
233
234    # Invalid date, fail check and return response again
235    request_headers = {'if-modified-since': self.DATE_INVALID}
236    request = create_request(request_headers)
237    self.assertEqual(archive.get(request), response)
238
239    # fail check since the request is not a GET or HEAD request (as per RFC)
240    request_headers = {'if-modified-since': self.DATE_FUTURE}
241    request = httparchive.ArchivedHttpRequest(
242        'POST', 'www.test.com', '/', None, request_headers)
243    self.assertEqual(archive.get(request), response)
244
245  def test_get_unmodified_headers(self):
246    request = self.REQUEST
247    response = self.RESPONSE
248    archive = self.archive
249    not_modified_response = httparchive.create_response(304)
250
251    # Succeed check
252    request_headers = {'if-unmodified-since': self.DATE_PAST}
253    request = create_request(request_headers)
254    self.assertEqual(archive.get(request), not_modified_response)
255
256    # Fail check
257    request_headers = {'if-unmodified-since': self.DATE_FUTURE}
258    request = create_request(request_headers)
259    self.assertEqual(archive.get(request), response)
260
261    # Succeed check
262    request_headers = {'if-unmodified-since': self.DATE_PRESENT}
263    request = create_request(request_headers)
264    self.assertEqual(archive.get(request), not_modified_response)
265
266    # Fail check
267    request_headers = {'if-unmodified-since': self.DATE_INVALID}
268    request = create_request(request_headers)
269    self.assertEqual(archive.get(request), response)
270
271    # Fail check since the request is not a GET or HEAD request (as per RFC)
272    request_headers = {'if-modified-since': self.DATE_PAST}
273    request = httparchive.ArchivedHttpRequest(
274        'POST', 'www.test.com', '/', None, request_headers)
275    self.assertEqual(archive.get(request), response)
276
277  def test_get_etags(self):
278    request = self.REQUEST
279    response = self.RESPONSE
280    archive = self.archive
281    not_modified_response = httparchive.create_response(304)
282    precondition_failed_response = httparchive.create_response(412)
283
284    # if-match headers
285    request_headers = {'if-match': self.ETAG_VALID}
286    request = create_request(request_headers)
287    self.assertEqual(archive.get(request), response)
288
289    request_headers = {'if-match': self.ETAG_INVALID}
290    request = create_request(request_headers)
291    self.assertEqual(archive.get(request), precondition_failed_response)
292
293    # if-none-match headers
294    request_headers = {'if-none-match': self.ETAG_VALID}
295    request = create_request(request_headers)
296    self.assertEqual(archive.get(request), not_modified_response)
297
298    request_headers = {'if-none-match': self.ETAG_INVALID}
299    request = create_request(request_headers)
300    self.assertEqual(archive.get(request), response)
301
302  def test_get_multiple_match_headers(self):
303    request = self.REQUEST
304    response = self.RESPONSE
305    archive = self.archive
306    not_modified_response = httparchive.create_response(304)
307    precondition_failed_response = httparchive.create_response(412)
308
309    # if-match headers
310    # If the request would, without the If-Match header field,
311    # result in anything other than a 2xx or 412 status,
312    # then the If-Match header MUST be ignored.
313
314    request_headers = {
315        'if-match': self.ETAG_VALID,
316        'if-modified-since': self.DATE_PAST,
317    }
318    request = create_request(request_headers)
319    self.assertEqual(archive.get(request), response)
320
321    # Invalid etag, precondition failed
322    request_headers = {
323        'if-match': self.ETAG_INVALID,
324        'if-modified-since': self.DATE_PAST,
325    }
326    request = create_request(request_headers)
327    self.assertEqual(archive.get(request), precondition_failed_response)
328
329    # 304 response; ignore if-match header
330    request_headers = {
331        'if-match': self.ETAG_VALID,
332        'if-modified-since': self.DATE_FUTURE,
333    }
334    request = create_request(request_headers)
335    self.assertEqual(archive.get(request), not_modified_response)
336
337    # 304 response; ignore if-match header
338    request_headers = {
339        'if-match': self.ETAG_INVALID,
340        'if-modified-since': self.DATE_PRESENT,
341    }
342    request = create_request(request_headers)
343    self.assertEqual(archive.get(request), not_modified_response)
344
345    # Invalid etag, precondition failed
346    request_headers = {
347        'if-match': self.ETAG_INVALID,
348        'if-modified-since': self.DATE_INVALID,
349    }
350    request = create_request(request_headers)
351    self.assertEqual(archive.get(request), precondition_failed_response)
352
353  def test_get_multiple_none_match_headers(self):
354    request = self.REQUEST
355    response = self.RESPONSE
356    archive = self.archive
357    not_modified_response = httparchive.create_response(304)
358    precondition_failed_response = httparchive.create_response(412)
359
360    # if-none-match headers
361    # If the request would, without the If-None-Match header field,
362    # result in anything other than a 2xx or 304 status,
363    # then the If-None-Match header MUST be ignored.
364
365    request_headers = {
366        'if-none-match': self.ETAG_VALID,
367        'if-modified-since': self.DATE_PAST,
368    }
369    request = create_request(request_headers)
370    self.assertEqual(archive.get(request), response)
371
372    request_headers = {
373        'if-none-match': self.ETAG_INVALID,
374        'if-modified-since': self.DATE_PAST,
375    }
376    request = create_request(request_headers)
377    self.assertEqual(archive.get(request), response)
378
379    # etag match, precondition failed
380    request_headers = {
381        'if-none-match': self.ETAG_VALID,
382        'if-modified-since': self.DATE_FUTURE,
383    }
384    request = create_request(request_headers)
385    self.assertEqual(archive.get(request), not_modified_response)
386
387    request_headers = {
388        'if-none-match': self.ETAG_INVALID,
389        'if-modified-since': self.DATE_PRESENT,
390    }
391    request = create_request(request_headers)
392    self.assertEqual(archive.get(request), not_modified_response)
393
394    request_headers = {
395        'if-none-match': self.ETAG_INVALID,
396        'if-modified-since': self.DATE_INVALID,
397    }
398    request = create_request(request_headers)
399    self.assertEqual(archive.get(request), response)
400
401
402class ArchivedHttpResponse(unittest.TestCase):
403  PAST_DATE_A = 'Tue, 13 Jul 2010 03:47:07 GMT'
404  PAST_DATE_B = 'Tue, 13 Jul 2010 02:47:07 GMT'  # PAST_DATE_A -1 hour
405  PAST_DATE_C = 'Tue, 13 Jul 2010 04:47:07 GMT'  # PAST_DATE_A +1 hour
406  NOW_DATE_A = 'Wed, 20 Jul 2011 04:58:08 GMT'
407  NOW_DATE_B = 'Wed, 20 Jul 2011 03:58:08 GMT'  # NOW_DATE_A -1 hour
408  NOW_DATE_C = 'Wed, 20 Jul 2011 05:58:08 GMT'  # NOW_DATE_A +1 hour
409  NOW_SECONDS = calendar.timegm(email.utils.parsedate(NOW_DATE_A))
410
411  def setUp(self):
412    self.response = create_response([('date', self.PAST_DATE_A)])
413
414  def test_update_date_same_date(self):
415    self.assertEqual(
416        self.response.update_date(self.PAST_DATE_A, now=self.NOW_SECONDS),
417        self.NOW_DATE_A)
418
419  def test_update_date_before_date(self):
420    self.assertEqual(
421        self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
422        self.NOW_DATE_B)
423
424  def test_update_date_after_date(self):
425    self.assertEqual(
426        self.response.update_date(self.PAST_DATE_C, now=self.NOW_SECONDS),
427        self.NOW_DATE_C)
428
429  def test_update_date_bad_date_param(self):
430    self.assertEqual(
431        self.response.update_date('garbage date', now=self.NOW_SECONDS),
432        'garbage date')
433
434  def test_update_date_bad_date_header(self):
435    self.response.set_header('date', 'garbage date')
436    self.assertEqual(
437        self.response.update_date(self.PAST_DATE_B, now=self.NOW_SECONDS),
438        self.PAST_DATE_B)
439
440
441if __name__ == '__main__':
442  unittest.main()
443