1#!/usr/bin/env python2.4
2"""A set of unit tests for httplib2.py."""
3
4__author__ = "Joe Gregorio (joe@bitworking.org)"
5__copyright__ = "Copyright 2006, Joe Gregorio"
6__contributors__ = []
7__license__ = "MIT"
8__version__ = "0.1 ($Rev: 118 $)"
9
10import base64
11import httplib
12import httplib2
13import os
14import pickle
15import socket
16import StringIO
17import sys
18import time
19import unittest
20import urlparse
21
22try:
23    import ssl
24except ImportError:
25    pass
26
27# Python 2.3 support
28if not hasattr(unittest.TestCase, "assertTrue"):
29    unittest.TestCase.assertTrue = unittest.TestCase.failUnless
30    unittest.TestCase.assertFalse = unittest.TestCase.failIf
31
32# The test resources base uri
33base = "http://bitworking.org/projects/httplib2/test/"
34# base = 'http://localhost/projects/httplib2/test/'
35cacheDirName = ".cache"
36
37
38class CredentialsTest(unittest.TestCase):
39    def test(self):
40        c = httplib2.Credentials()
41        c.add("joe", "password")
42        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
43        self.assertEqual(("joe", "password"), list(c.iter(""))[0])
44        c.add("fred", "password2", "wellformedweb.org")
45        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
46        self.assertEqual(1, len(list(c.iter("bitworking.org"))))
47        self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
48        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49        c.clear()
50        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
51        c.add("fred", "password2", "wellformedweb.org")
52        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
53        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
54        self.assertEqual(0, len(list(c.iter(""))))
55
56
57class ParserTest(unittest.TestCase):
58    def testFromStd66(self):
59        self.assertEqual(
60            ("http", "example.com", "", None, None),
61            httplib2.parse_uri("http://example.com"),
62        )
63        self.assertEqual(
64            ("https", "example.com", "", None, None),
65            httplib2.parse_uri("https://example.com"),
66        )
67        self.assertEqual(
68            ("https", "example.com:8080", "", None, None),
69            httplib2.parse_uri("https://example.com:8080"),
70        )
71        self.assertEqual(
72            ("http", "example.com", "/", None, None),
73            httplib2.parse_uri("http://example.com/"),
74        )
75        self.assertEqual(
76            ("http", "example.com", "/path", None, None),
77            httplib2.parse_uri("http://example.com/path"),
78        )
79        self.assertEqual(
80            ("http", "example.com", "/path", "a=1&b=2", None),
81            httplib2.parse_uri("http://example.com/path?a=1&b=2"),
82        )
83        self.assertEqual(
84            ("http", "example.com", "/path", "a=1&b=2", "fred"),
85            httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
86        )
87        self.assertEqual(
88            ("http", "example.com", "/path", "a=1&b=2", "fred"),
89            httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
90        )
91
92
93class UrlNormTest(unittest.TestCase):
94    def test(self):
95        self.assertEqual(
96            "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
97        )
98        self.assertEqual(
99            "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
100        )
101        self.assertEqual(
102            "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
103        )
104        self.assertEqual(
105            "http://example.org/mypath?a=b",
106            httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
107        )
108        self.assertEqual(
109            "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
110        )
111        self.assertEqual(
112            httplib2.urlnorm("http://localhost:80/"),
113            httplib2.urlnorm("HTTP://LOCALHOST:80"),
114        )
115        try:
116            httplib2.urlnorm("/")
117            self.fail("Non-absolute URIs should raise an exception")
118        except httplib2.RelativeURIError:
119            pass
120
121
122class UrlSafenameTest(unittest.TestCase):
123    def test(self):
124        # Test that different URIs end up generating different safe names
125        self.assertEqual(
126            "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
127            httplib2.safename("http://example.org/fred/?a=b"),
128        )
129        self.assertEqual(
130            "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
131            httplib2.safename("http://example.org/fred?/a=b"),
132        )
133        self.assertEqual(
134            "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
135            httplib2.safename("http://www.example.org/fred?/a=b"),
136        )
137        self.assertEqual(
138            httplib2.safename(httplib2.urlnorm("http://www")[-1]),
139            httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
140        )
141        self.assertEqual(
142            "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
143            httplib2.safename("https://www.example.org/fred?/a=b"),
144        )
145        self.assertNotEqual(
146            httplib2.safename("http://www"), httplib2.safename("https://www")
147        )
148        # Test the max length limits
149        uri = "http://" + ("w" * 200) + ".org"
150        uri2 = "http://" + ("w" * 201) + ".org"
151        self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
152        # Max length should be 200 + 1 (",") + 32
153        self.assertEqual(233, len(httplib2.safename(uri2)))
154        self.assertEqual(233, len(httplib2.safename(uri)))
155        # Unicode
156        if sys.version_info >= (2, 3):
157            self.assertEqual(
158                "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
159                httplib2.safename(u"http://\u2304.org/fred/?a=b"),
160            )
161
162
163class _MyResponse(StringIO.StringIO):
164    def __init__(self, body, **kwargs):
165        StringIO.StringIO.__init__(self, body)
166        self.headers = kwargs
167
168    def iteritems(self):
169        return self.headers.iteritems()
170
171
172class _MyHTTPConnection(object):
173    "This class is just a mock of httplib.HTTPConnection used for testing"
174
175    def __init__(
176        self,
177        host,
178        port=None,
179        key_file=None,
180        cert_file=None,
181        strict=None,
182        timeout=None,
183        proxy_info=None,
184    ):
185        self.host = host
186        self.port = port
187        self.timeout = timeout
188        self.log = ""
189        self.sock = None
190
191    def set_debuglevel(self, level):
192        pass
193
194    def connect(self):
195        "Connect to a host on a given port."
196        pass
197
198    def close(self):
199        pass
200
201    def request(self, method, request_uri, body, headers):
202        pass
203
204    def getresponse(self):
205        return _MyResponse("the body", status="200")
206
207
208class _MyHTTPBadStatusConnection(object):
209    "Mock of httplib.HTTPConnection that raises BadStatusLine."
210
211    num_calls = 0
212
213    def __init__(
214        self,
215        host,
216        port=None,
217        key_file=None,
218        cert_file=None,
219        strict=None,
220        timeout=None,
221        proxy_info=None,
222    ):
223        self.host = host
224        self.port = port
225        self.timeout = timeout
226        self.log = ""
227        self.sock = None
228        _MyHTTPBadStatusConnection.num_calls = 0
229
230    def set_debuglevel(self, level):
231        pass
232
233    def connect(self):
234        pass
235
236    def close(self):
237        pass
238
239    def request(self, method, request_uri, body, headers):
240        pass
241
242    def getresponse(self):
243        _MyHTTPBadStatusConnection.num_calls += 1
244        raise httplib.BadStatusLine("")
245
246
247class HttpTest(unittest.TestCase):
248    def setUp(self):
249        if os.path.exists(cacheDirName):
250            [
251                os.remove(os.path.join(cacheDirName, file))
252                for file in os.listdir(cacheDirName)
253            ]
254
255        if sys.version_info < (2, 6):
256            disable_cert_validation = True
257        else:
258            disable_cert_validation = False
259        self.http = httplib2.Http(
260            cacheDirName, disable_ssl_certificate_validation=disable_cert_validation
261        )
262        self.http.clear_credentials()
263
264    def testIPv6NoSSL(self):
265        try:
266            self.http.request("http://[::1]/")
267        except socket.gaierror:
268            self.fail("should get the address family right for IPv6")
269        except socket.error:
270            # Even if IPv6 isn't installed on a machine it should just raise socket.error
271            pass
272
273    def testIPv6SSL(self):
274        try:
275            self.http.request("https://[::1]/")
276        except socket.gaierror:
277            self.fail("should get the address family right for IPv6")
278        except httplib2.CertificateHostnameMismatch:
279            # We connected and verified that the certificate doesn't match
280            #  the name. Good enough.
281            pass
282        except socket.error:
283            # Even if IPv6 isn't installed on a machine it should just raise socket.error
284            pass
285
286    def testConnectionType(self):
287        self.http.force_exception_to_status_code = False
288        response, content = self.http.request(
289            "http://bitworking.org", connection_type=_MyHTTPConnection
290        )
291        self.assertEqual(response["content-location"], "http://bitworking.org")
292        self.assertEqual(content, "the body")
293
294    def testBadStatusLineRetry(self):
295        old_retries = httplib2.RETRIES
296        httplib2.RETRIES = 1
297        self.http.force_exception_to_status_code = False
298        try:
299            response, content = self.http.request(
300                "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
301            )
302        except httplib.BadStatusLine:
303            self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
304        httplib2.RETRIES = old_retries
305
306    def testGetUnknownServer(self):
307        self.http.force_exception_to_status_code = False
308        try:
309            self.http.request("http://fred.bitworking.org/")
310            self.fail(
311                "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
312            )
313        except httplib2.ServerNotFoundError:
314            pass
315
316        # Now test with exceptions turned off
317        self.http.force_exception_to_status_code = True
318
319        (response, content) = self.http.request("http://fred.bitworking.org/")
320        self.assertEqual(response["content-type"], "text/plain")
321        self.assertTrue(content.startswith("Unable to find"))
322        self.assertEqual(response.status, 400)
323
324    def testGetConnectionRefused(self):
325        self.http.force_exception_to_status_code = False
326        try:
327            self.http.request("http://localhost:7777/")
328            self.fail("An socket.error exception must be thrown on Connection Refused.")
329        except socket.error:
330            pass
331
332        # Now test with exceptions turned off
333        self.http.force_exception_to_status_code = True
334
335        (response, content) = self.http.request("http://localhost:7777/")
336        self.assertEqual(response["content-type"], "text/plain")
337        self.assertTrue(
338            "Connection refused" in content or "actively refused" in content,
339            "Unexpected status %(content)s" % vars(),
340        )
341        self.assertEqual(response.status, 400)
342
343    def testGetIRI(self):
344        if sys.version_info >= (2, 3):
345            uri = urlparse.urljoin(
346                base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
347            )
348            (response, content) = self.http.request(uri, "GET")
349            d = self.reflector(content)
350            self.assertTrue("QUERY_STRING" in d)
351            self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
352
353    def testGetIsDefaultMethod(self):
354        # Test that GET is the default method
355        uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
356        (response, content) = self.http.request(uri)
357        self.assertEqual(response["x-method"], "GET")
358
359    def testDifferentMethods(self):
360        # Test that all methods can be used
361        uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
362        for method in ["GET", "PUT", "DELETE", "POST"]:
363            (response, content) = self.http.request(uri, method, body=" ")
364            self.assertEqual(response["x-method"], method)
365
366    def testHeadRead(self):
367        # Test that we don't try to read the response of a HEAD request
368        # since httplib blocks response.read() for HEAD requests.
369        # Oddly enough this doesn't appear as a problem when doing HEAD requests
370        # against Apache servers.
371        uri = "http://www.google.com/"
372        (response, content) = self.http.request(uri, "HEAD")
373        self.assertEqual(response.status, 200)
374        self.assertEqual(content, "")
375
376    def testGetNoCache(self):
377        # Test that can do a GET w/o the cache turned on.
378        http = httplib2.Http()
379        uri = urlparse.urljoin(base, "304/test_etag.txt")
380        (response, content) = http.request(uri, "GET")
381        self.assertEqual(response.status, 200)
382        self.assertEqual(response.previous, None)
383
384    def testGetOnlyIfCachedCacheHit(self):
385        # Test that can do a GET with cache and 'only-if-cached'
386        uri = urlparse.urljoin(base, "304/test_etag.txt")
387        (response, content) = self.http.request(uri, "GET")
388        (response, content) = self.http.request(
389            uri, "GET", headers={"cache-control": "only-if-cached"}
390        )
391        self.assertEqual(response.fromcache, True)
392        self.assertEqual(response.status, 200)
393
394    def testGetOnlyIfCachedCacheMiss(self):
395        # Test that can do a GET with no cache with 'only-if-cached'
396        uri = urlparse.urljoin(base, "304/test_etag.txt")
397        (response, content) = self.http.request(
398            uri, "GET", headers={"cache-control": "only-if-cached"}
399        )
400        self.assertEqual(response.fromcache, False)
401        self.assertEqual(response.status, 504)
402
403    def testGetOnlyIfCachedNoCacheAtAll(self):
404        # Test that can do a GET with no cache with 'only-if-cached'
405        # Of course, there might be an intermediary beyond us
406        # that responds to the 'only-if-cached', so this
407        # test can't really be guaranteed to pass.
408        http = httplib2.Http()
409        uri = urlparse.urljoin(base, "304/test_etag.txt")
410        (response, content) = http.request(
411            uri, "GET", headers={"cache-control": "only-if-cached"}
412        )
413        self.assertEqual(response.fromcache, False)
414        self.assertEqual(response.status, 504)
415
416    def testUserAgent(self):
417        # Test that we provide a default user-agent
418        uri = urlparse.urljoin(base, "user-agent/test.cgi")
419        (response, content) = self.http.request(uri, "GET")
420        self.assertEqual(response.status, 200)
421        self.assertTrue(content.startswith("Python-httplib2/"))
422
423    def testUserAgentNonDefault(self):
424        # Test that the default user-agent can be over-ridden
425
426        uri = urlparse.urljoin(base, "user-agent/test.cgi")
427        (response, content) = self.http.request(
428            uri, "GET", headers={"User-Agent": "fred/1.0"}
429        )
430        self.assertEqual(response.status, 200)
431        self.assertTrue(content.startswith("fred/1.0"))
432
433    def testGet300WithLocation(self):
434        # Test the we automatically follow 300 redirects if a Location: header is provided
435        uri = urlparse.urljoin(base, "300/with-location-header.asis")
436        (response, content) = self.http.request(uri, "GET")
437        self.assertEqual(response.status, 200)
438        self.assertEqual(content, "This is the final destination.\n")
439        self.assertEqual(response.previous.status, 300)
440        self.assertEqual(response.previous.fromcache, False)
441
442        # Confirm that the intermediate 300 is not cached
443        (response, content) = self.http.request(uri, "GET")
444        self.assertEqual(response.status, 200)
445        self.assertEqual(content, "This is the final destination.\n")
446        self.assertEqual(response.previous.status, 300)
447        self.assertEqual(response.previous.fromcache, False)
448
449    def testGet300WithLocationNoRedirect(self):
450        # Test the we automatically follow 300 redirects if a Location: header is provided
451        self.http.follow_redirects = False
452        uri = urlparse.urljoin(base, "300/with-location-header.asis")
453        (response, content) = self.http.request(uri, "GET")
454        self.assertEqual(response.status, 300)
455
456    def testGet300WithoutLocation(self):
457        # Not giving a Location: header in a 300 response is acceptable
458        # In which case we just return the 300 response
459        uri = urlparse.urljoin(base, "300/without-location-header.asis")
460        (response, content) = self.http.request(uri, "GET")
461        self.assertEqual(response.status, 300)
462        self.assertTrue(response["content-type"].startswith("text/html"))
463        self.assertEqual(response.previous, None)
464
465    def testGet301(self):
466        # Test that we automatically follow 301 redirects
467        # and that we cache the 301 response
468        uri = urlparse.urljoin(base, "301/onestep.asis")
469        destination = urlparse.urljoin(base, "302/final-destination.txt")
470        (response, content) = self.http.request(uri, "GET")
471        self.assertEqual(response.status, 200)
472        self.assertTrue("content-location" in response)
473        self.assertEqual(response["content-location"], destination)
474        self.assertEqual(content, "This is the final destination.\n")
475        self.assertEqual(response.previous.status, 301)
476        self.assertEqual(response.previous.fromcache, False)
477
478        (response, content) = self.http.request(uri, "GET")
479        self.assertEqual(response.status, 200)
480        self.assertEqual(response["content-location"], destination)
481        self.assertEqual(content, "This is the final destination.\n")
482        self.assertEqual(response.previous.status, 301)
483        self.assertEqual(response.previous.fromcache, True)
484
485    def testHead301(self):
486        # Test that we automatically follow 301 redirects
487        uri = urlparse.urljoin(base, "301/onestep.asis")
488        destination = urlparse.urljoin(base, "302/final-destination.txt")
489        (response, content) = self.http.request(uri, "HEAD")
490        self.assertEqual(response.status, 200)
491        self.assertEqual(response.previous.status, 301)
492        self.assertEqual(response.previous.fromcache, False)
493
494    def testGet301NoRedirect(self):
495        # Test that we automatically follow 301 redirects
496        # and that we cache the 301 response
497        self.http.follow_redirects = False
498        uri = urlparse.urljoin(base, "301/onestep.asis")
499        destination = urlparse.urljoin(base, "302/final-destination.txt")
500        (response, content) = self.http.request(uri, "GET")
501        self.assertEqual(response.status, 301)
502
503    def testGet302(self):
504        # Test that we automatically follow 302 redirects
505        # and that we DO NOT cache the 302 response
506        uri = urlparse.urljoin(base, "302/onestep.asis")
507        destination = urlparse.urljoin(base, "302/final-destination.txt")
508        (response, content) = self.http.request(uri, "GET")
509        self.assertEqual(response.status, 200)
510        self.assertEqual(response["content-location"], destination)
511        self.assertEqual(content, "This is the final destination.\n")
512        self.assertEqual(response.previous.status, 302)
513        self.assertEqual(response.previous.fromcache, False)
514
515        uri = urlparse.urljoin(base, "302/onestep.asis")
516        (response, content) = self.http.request(uri, "GET")
517        self.assertEqual(response.status, 200)
518        self.assertEqual(response.fromcache, True)
519        self.assertEqual(response["content-location"], destination)
520        self.assertEqual(content, "This is the final destination.\n")
521        self.assertEqual(response.previous.status, 302)
522        self.assertEqual(response.previous.fromcache, False)
523        self.assertEqual(response.previous["content-location"], uri)
524
525        uri = urlparse.urljoin(base, "302/twostep.asis")
526
527        (response, content) = self.http.request(uri, "GET")
528        self.assertEqual(response.status, 200)
529        self.assertEqual(response.fromcache, True)
530        self.assertEqual(content, "This is the final destination.\n")
531        self.assertEqual(response.previous.status, 302)
532        self.assertEqual(response.previous.fromcache, False)
533
534    def testGet302RedirectionLimit(self):
535        # Test that we can set a lower redirection limit
536        # and that we raise an exception when we exceed
537        # that limit.
538        self.http.force_exception_to_status_code = False
539
540        uri = urlparse.urljoin(base, "302/twostep.asis")
541        try:
542            (response, content) = self.http.request(uri, "GET", redirections=1)
543            self.fail("This should not happen")
544        except httplib2.RedirectLimit:
545            pass
546        except Exception as e:
547            self.fail("Threw wrong kind of exception ")
548
549        # Re-run the test with out the exceptions
550        self.http.force_exception_to_status_code = True
551
552        (response, content) = self.http.request(uri, "GET", redirections=1)
553        self.assertEqual(response.status, 500)
554        self.assertTrue(response.reason.startswith("Redirected more"))
555        self.assertEqual("302", response["status"])
556        self.assertTrue(content.startswith("<html>"))
557        self.assertTrue(response.previous != None)
558
559    def testGet302NoLocation(self):
560        # Test that we throw an exception when we get
561        # a 302 with no Location: header.
562        self.http.force_exception_to_status_code = False
563        uri = urlparse.urljoin(base, "302/no-location.asis")
564        try:
565            (response, content) = self.http.request(uri, "GET")
566            self.fail("Should never reach here")
567        except httplib2.RedirectMissingLocation:
568            pass
569        except Exception as e:
570            self.fail("Threw wrong kind of exception ")
571
572        # Re-run the test with out the exceptions
573        self.http.force_exception_to_status_code = True
574
575        (response, content) = self.http.request(uri, "GET")
576        self.assertEqual(response.status, 500)
577        self.assertTrue(response.reason.startswith("Redirected but"))
578        self.assertEqual("302", response["status"])
579        self.assertTrue(content.startswith("This is content"))
580
581    def testGet301ViaHttps(self):
582        # Google always redirects to https://www.google.com
583        (response, content) = self.http.request("https://code.google.com/apis/", "GET")
584        self.assertEqual(200, response.status)
585        self.assertEqual(301, response.previous.status)
586
587    def testGetViaHttps(self):
588        # Test that we can handle HTTPS
589        (response, content) = self.http.request(
590            "https://www.google.com/adsense/", "GET"
591        )
592        self.assertEqual(200, response.status)
593
594    def testGetViaHttpsSpecViolationOnLocation(self):
595        # Test that we follow redirects through HTTPS
596        # even if they violate the spec by including
597        # a relative Location: header instead of an
598        # absolute one.
599        (response, content) = self.http.request("https://www.google.com/adsense", "GET")
600        self.assertEqual(200, response.status)
601        self.assertNotEqual(None, response.previous)
602
603    def testSslCertValidationDoubleDots(self):
604        pass
605        # No longer a valid test.
606        # if sys.version_info >= (2, 6):
607        # Test that we get match a double dot cert
608        # try:
609        #  self.http.request("https://www.appspot.com/", "GET")
610        # except httplib2.CertificateHostnameMismatch:
611        #  self.fail('cert with *.*.appspot.com should not raise an exception.')
612
613    def testSslHostnameValidation(self):
614        pass
615        # No longer a valid test.
616        # if sys.version_info >= (2, 6):
617        # The SSL server at google.com:443 returns a certificate for
618        # 'www.google.com', which results in a host name mismatch.
619        # Note that this test only works because the ssl module and httplib2
620        # do not support SNI; for requests specifying a server name of
621        # 'google.com' via SNI, a matching cert would be returned.
622        #    self.assertRaises(httplib2.CertificateHostnameMismatch,
623        #            self.http.request, "https://google.com/", "GET")
624
625    def testSslCertValidationWithoutSslModuleFails(self):
626        if sys.version_info < (2, 6):
627            http = httplib2.Http(disable_ssl_certificate_validation=False)
628            self.assertRaises(
629                httplib2.CertificateValidationUnsupported,
630                http.request,
631                "https://www.google.com/",
632                "GET",
633            )
634
635    def testGetViaHttpsKeyCert(self):
636        #  At this point I can only test
637        #  that the key and cert files are passed in
638        #  correctly to httplib. It would be nice to have
639        #  a real https endpoint to test against.
640
641        # bitworking.org presents an certificate for a non-matching host
642        # (*.webfaction.com), so we need to disable cert checking for this test.
643        http = httplib2.Http(timeout=2, disable_ssl_certificate_validation=True)
644
645        http.add_certificate("akeyfile", "acertfile", "bitworking.org")
646        try:
647            (response, content) = http.request("https://bitworking.org", "GET")
648        except:
649            pass
650        self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
651        self.assertEqual(
652            http.connections["https:bitworking.org"].cert_file, "acertfile"
653        )
654
655        try:
656            (response, content) = http.request("https://notthere.bitworking.org", "GET")
657        except:
658            pass
659        self.assertEqual(
660            http.connections["https:notthere.bitworking.org"].key_file, None
661        )
662        self.assertEqual(
663            http.connections["https:notthere.bitworking.org"].cert_file, None
664        )
665
666    def testGet303(self):
667        # Do a follow-up GET on a Location: header
668        # returned from a POST that gave a 303.
669        uri = urlparse.urljoin(base, "303/303.cgi")
670        (response, content) = self.http.request(uri, "POST", " ")
671        self.assertEqual(response.status, 200)
672        self.assertEqual(content, "This is the final destination.\n")
673        self.assertEqual(response.previous.status, 303)
674
675    def testGet303NoRedirect(self):
676        # Do a follow-up GET on a Location: header
677        # returned from a POST that gave a 303.
678        self.http.follow_redirects = False
679        uri = urlparse.urljoin(base, "303/303.cgi")
680        (response, content) = self.http.request(uri, "POST", " ")
681        self.assertEqual(response.status, 303)
682
683    def test303ForDifferentMethods(self):
684        # Test that all methods can be used
685        uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
686        for (method, method_on_303) in [
687            ("PUT", "GET"),
688            ("DELETE", "GET"),
689            ("POST", "GET"),
690            ("GET", "GET"),
691            ("HEAD", "GET"),
692        ]:
693            (response, content) = self.http.request(uri, method, body=" ")
694            self.assertEqual(response["x-method"], method_on_303)
695
696    def test303AndForwardAuthorizationHeader(self):
697        # Test that all methods can be used
698        uri = urlparse.urljoin(base, "303/redirect-to-header-reflector.cgi")
699        headers = {"authorization": "Bearer foo"}
700        response, content = self.http.request(uri, "GET", body=" ", headers=headers)
701        # self.assertTrue('authorization' not in content)
702        self.http.follow_all_redirects = True
703        self.http.forward_authorization_headers = True
704        response, content = self.http.request(uri, "GET", body=" ", headers=headers)
705        # Oh, how I wish Apache didn't eat the Authorization header.
706        # self.assertTrue('authorization' in content)
707
708    def testGet304(self):
709        # Test that we use ETags properly to validate our cache
710        uri = urlparse.urljoin(base, "304/test_etag.txt")
711        (response, content) = self.http.request(
712            uri, "GET", headers={"accept-encoding": "identity"}
713        )
714        self.assertNotEqual(response["etag"], "")
715
716        (response, content) = self.http.request(uri, "GET")
717        (response, content) = self.http.request(
718            uri, "GET", headers={"cache-control": "must-revalidate"}
719        )
720        self.assertEqual(response.status, 200)
721        self.assertEqual(response.fromcache, True)
722
723        cache_file_name = os.path.join(
724            cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
725        )
726        f = open(cache_file_name, "r")
727        status_line = f.readline()
728        f.close()
729
730        self.assertTrue(status_line.startswith("status:"))
731
732        (response, content) = self.http.request(uri, "HEAD")
733        self.assertEqual(response.status, 200)
734        self.assertEqual(response.fromcache, True)
735
736        (response, content) = self.http.request(
737            uri, "GET", headers={"range": "bytes=0-0"}
738        )
739        self.assertEqual(response.status, 206)
740        self.assertEqual(response.fromcache, False)
741
742    def testGetIgnoreEtag(self):
743        # Test that we can forcibly ignore ETags
744        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
745        (response, content) = self.http.request(
746            uri, "GET", headers={"accept-encoding": "identity"}
747        )
748        self.assertNotEqual(response["etag"], "")
749
750        (response, content) = self.http.request(
751            uri,
752            "GET",
753            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
754        )
755        d = self.reflector(content)
756        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
757
758        self.http.ignore_etag = True
759        (response, content) = self.http.request(
760            uri,
761            "GET",
762            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
763        )
764        d = self.reflector(content)
765        self.assertEqual(response.fromcache, False)
766        self.assertFalse("HTTP_IF_NONE_MATCH" in d)
767
768    def testOverrideEtag(self):
769        # Test that we can forcibly ignore ETags
770        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
771        (response, content) = self.http.request(
772            uri, "GET", headers={"accept-encoding": "identity"}
773        )
774        self.assertNotEqual(response["etag"], "")
775
776        (response, content) = self.http.request(
777            uri,
778            "GET",
779            headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
780        )
781        d = self.reflector(content)
782        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
783        self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
784
785        (response, content) = self.http.request(
786            uri,
787            "GET",
788            headers={
789                "accept-encoding": "identity",
790                "cache-control": "max-age=0",
791                "if-none-match": "fred",
792            },
793        )
794        d = self.reflector(content)
795        self.assertTrue("HTTP_IF_NONE_MATCH" in d)
796        self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
797
798    # MAP-commented this out because it consistently fails
799    #    def testGet304EndToEnd(self):
800    #       # Test that end to end headers get overwritten in the cache
801    #        uri = urlparse.urljoin(base, "304/end2end.cgi")
802    #        (response, content) = self.http.request(uri, "GET")
803    #        self.assertNotEqual(response['etag'], "")
804    #        old_date = response['date']
805    #        time.sleep(2)
806    #
807    #        (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
808    #        # The response should be from the cache, but the Date: header should be updated.
809    #        new_date = response['date']
810    #        self.assertNotEqual(new_date, old_date)
811    #        self.assertEqual(response.status, 200)
812    #        self.assertEqual(response.fromcache, True)
813
814    def testGet304LastModified(self):
815        # Test that we can still handle a 304
816        # by only using the last-modified cache validator.
817        uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
818        (response, content) = self.http.request(uri, "GET")
819
820        self.assertNotEqual(response["last-modified"], "")
821        (response, content) = self.http.request(uri, "GET")
822        (response, content) = self.http.request(uri, "GET")
823        self.assertEqual(response.status, 200)
824        self.assertEqual(response.fromcache, True)
825
826    def testGet307(self):
827        # Test that we do follow 307 redirects but
828        # do not cache the 307
829        uri = urlparse.urljoin(base, "307/onestep.asis")
830        (response, content) = self.http.request(uri, "GET")
831        self.assertEqual(response.status, 200)
832        self.assertEqual(content, "This is the final destination.\n")
833        self.assertEqual(response.previous.status, 307)
834        self.assertEqual(response.previous.fromcache, False)
835
836        (response, content) = self.http.request(uri, "GET")
837        self.assertEqual(response.status, 200)
838        self.assertEqual(response.fromcache, True)
839        self.assertEqual(content, "This is the final destination.\n")
840        self.assertEqual(response.previous.status, 307)
841        self.assertEqual(response.previous.fromcache, False)
842
843    def testGet410(self):
844        # Test that we pass 410's through
845        uri = urlparse.urljoin(base, "410/410.asis")
846        (response, content) = self.http.request(uri, "GET")
847        self.assertEqual(response.status, 410)
848
849    def testVaryHeaderSimple(self):
850        """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
851
852        """
853        # test that the vary header is sent
854        uri = urlparse.urljoin(base, "vary/accept.asis")
855        (response, content) = self.http.request(
856            uri, "GET", headers={"Accept": "text/plain"}
857        )
858        self.assertEqual(response.status, 200)
859        self.assertTrue("vary" in response)
860
861        # get the resource again, from the cache since accept header in this
862        # request is the same as the request
863        (response, content) = self.http.request(
864            uri, "GET", headers={"Accept": "text/plain"}
865        )
866        self.assertEqual(response.status, 200)
867        self.assertEqual(response.fromcache, True, msg="Should be from cache")
868
869        # get the resource again, not from cache since Accept headers does not match
870        (response, content) = self.http.request(
871            uri, "GET", headers={"Accept": "text/html"}
872        )
873        self.assertEqual(response.status, 200)
874        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
875
876        # get the resource again, without any Accept header, so again no match
877        (response, content) = self.http.request(uri, "GET")
878        self.assertEqual(response.status, 200)
879        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
880
881    def testNoVary(self):
882        pass
883        # when there is no vary, a different Accept header (e.g.) should not
884        # impact if the cache is used
885        # test that the vary header is not sent
886        # uri = urlparse.urljoin(base, "vary/no-vary.asis")
887        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
888        # self.assertEqual(response.status, 200)
889        # self.assertFalse(response.has_key('vary'))
890
891        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
892        # self.assertEqual(response.status, 200)
893        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
894        #
895        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
896        # self.assertEqual(response.status, 200)
897        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
898
899    def testVaryHeaderDouble(self):
900        uri = urlparse.urljoin(base, "vary/accept-double.asis")
901        (response, content) = self.http.request(
902            uri,
903            "GET",
904            headers={
905                "Accept": "text/plain",
906                "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
907            },
908        )
909        self.assertEqual(response.status, 200)
910        self.assertTrue("vary" in response)
911
912        # we are from cache
913        (response, content) = self.http.request(
914            uri,
915            "GET",
916            headers={
917                "Accept": "text/plain",
918                "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
919            },
920        )
921        self.assertEqual(response.fromcache, True, msg="Should be from cache")
922
923        (response, content) = self.http.request(
924            uri, "GET", headers={"Accept": "text/plain"}
925        )
926        self.assertEqual(response.status, 200)
927        self.assertEqual(response.fromcache, False)
928
929        # get the resource again, not from cache, varied headers don't match exact
930        (response, content) = self.http.request(
931            uri, "GET", headers={"Accept-Language": "da"}
932        )
933        self.assertEqual(response.status, 200)
934        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
935
936    def testVaryUnusedHeader(self):
937        # A header's value is not considered to vary if it's not used at all.
938        uri = urlparse.urljoin(base, "vary/unused-header.asis")
939        (response, content) = self.http.request(
940            uri, "GET", headers={"Accept": "text/plain"}
941        )
942        self.assertEqual(response.status, 200)
943        self.assertTrue("vary" in response)
944
945        # we are from cache
946        (response, content) = self.http.request(
947            uri, "GET", headers={"Accept": "text/plain"}
948        )
949        self.assertEqual(response.fromcache, True, msg="Should be from cache")
950
951    def testHeadGZip(self):
952        # Test that we don't try to decompress a HEAD response
953        uri = urlparse.urljoin(base, "gzip/final-destination.txt")
954        (response, content) = self.http.request(uri, "HEAD")
955        self.assertEqual(response.status, 200)
956        self.assertNotEqual(int(response["content-length"]), 0)
957        self.assertEqual(content, "")
958
959    def testGetGZip(self):
960        # Test that we support gzip compression
961        uri = urlparse.urljoin(base, "gzip/final-destination.txt")
962        (response, content) = self.http.request(uri, "GET")
963        self.assertEqual(response.status, 200)
964        self.assertFalse("content-encoding" in response)
965        self.assertTrue("-content-encoding" in response)
966        self.assertEqual(
967            int(response["content-length"]), len("This is the final destination.\n")
968        )
969        self.assertEqual(content, "This is the final destination.\n")
970
971    def testPostAndGZipResponse(self):
972        uri = urlparse.urljoin(base, "gzip/post.cgi")
973        (response, content) = self.http.request(uri, "POST", body=" ")
974        self.assertEqual(response.status, 200)
975        self.assertFalse("content-encoding" in response)
976        self.assertTrue("-content-encoding" in response)
977
978    def testGetGZipFailure(self):
979        # Test that we raise a good exception when the gzip fails
980        self.http.force_exception_to_status_code = False
981        uri = urlparse.urljoin(base, "gzip/failed-compression.asis")
982        try:
983            (response, content) = self.http.request(uri, "GET")
984            self.fail("Should never reach here")
985        except httplib2.FailedToDecompressContent:
986            pass
987        except Exception:
988            self.fail("Threw wrong kind of exception")
989
990        # Re-run the test with out the exceptions
991        self.http.force_exception_to_status_code = True
992
993        (response, content) = self.http.request(uri, "GET")
994        self.assertEqual(response.status, 500)
995        self.assertTrue(response.reason.startswith("Content purported"))
996
997    def testTimeout(self):
998        self.http.force_exception_to_status_code = True
999        uri = urlparse.urljoin(base, "timeout/timeout.cgi")
1000        try:
1001            import socket
1002
1003            socket.setdefaulttimeout(1)
1004        except:
1005            # Don't run the test if we can't set the timeout
1006            return
1007        (response, content) = self.http.request(uri)
1008        self.assertEqual(response.status, 408)
1009        self.assertTrue(response.reason.startswith("Request Timeout"))
1010        self.assertTrue(content.startswith("Request Timeout"))
1011
1012    def testIndividualTimeout(self):
1013        uri = urlparse.urljoin(base, "timeout/timeout.cgi")
1014        http = httplib2.Http(timeout=1)
1015        http.force_exception_to_status_code = True
1016
1017        (response, content) = http.request(uri)
1018        self.assertEqual(response.status, 408)
1019        self.assertTrue(response.reason.startswith("Request Timeout"))
1020        self.assertTrue(content.startswith("Request Timeout"))
1021
1022    def testHTTPSInitTimeout(self):
1023        c = httplib2.HTTPSConnectionWithTimeout("localhost", 80, timeout=47)
1024        self.assertEqual(47, c.timeout)
1025
1026    def testGetDeflate(self):
1027        # Test that we support deflate compression
1028        uri = urlparse.urljoin(base, "deflate/deflated.asis")
1029        (response, content) = self.http.request(uri, "GET")
1030        self.assertEqual(response.status, 200)
1031        self.assertFalse("content-encoding" in response)
1032        self.assertEqual(
1033            int(response["content-length"]), len("This is the final destination.")
1034        )
1035        self.assertEqual(content, "This is the final destination.")
1036
1037    def testGetDeflateFailure(self):
1038        # Test that we raise a good exception when the deflate fails
1039        self.http.force_exception_to_status_code = False
1040
1041        uri = urlparse.urljoin(base, "deflate/failed-compression.asis")
1042        try:
1043            (response, content) = self.http.request(uri, "GET")
1044            self.fail("Should never reach here")
1045        except httplib2.FailedToDecompressContent:
1046            pass
1047        except Exception:
1048            self.fail("Threw wrong kind of exception")
1049
1050        # Re-run the test with out the exceptions
1051        self.http.force_exception_to_status_code = True
1052
1053        (response, content) = self.http.request(uri, "GET")
1054        self.assertEqual(response.status, 500)
1055        self.assertTrue(response.reason.startswith("Content purported"))
1056
1057    def testGetDuplicateHeaders(self):
1058        # Test that duplicate headers get concatenated via ','
1059        uri = urlparse.urljoin(base, "duplicate-headers/multilink.asis")
1060        (response, content) = self.http.request(uri, "GET")
1061        self.assertEqual(response.status, 200)
1062        self.assertEqual(content, "This is content\n")
1063        self.assertEqual(
1064            response["link"].split(",")[0],
1065            '<http://bitworking.org>; rel="home"; title="BitWorking"',
1066        )
1067
1068    def testGetCacheControlNoCache(self):
1069        # Test Cache-Control: no-cache on requests
1070        uri = urlparse.urljoin(base, "304/test_etag.txt")
1071        (response, content) = self.http.request(
1072            uri, "GET", headers={"accept-encoding": "identity"}
1073        )
1074        self.assertNotEqual(response["etag"], "")
1075        (response, content) = self.http.request(
1076            uri, "GET", headers={"accept-encoding": "identity"}
1077        )
1078        self.assertEqual(response.status, 200)
1079        self.assertEqual(response.fromcache, True)
1080
1081        (response, content) = self.http.request(
1082            uri,
1083            "GET",
1084            headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
1085        )
1086        self.assertEqual(response.status, 200)
1087        self.assertEqual(response.fromcache, False)
1088
1089    def testGetCacheControlPragmaNoCache(self):
1090        # Test Pragma: no-cache on requests
1091        uri = urlparse.urljoin(base, "304/test_etag.txt")
1092        (response, content) = self.http.request(
1093            uri, "GET", headers={"accept-encoding": "identity"}
1094        )
1095        self.assertNotEqual(response["etag"], "")
1096        (response, content) = self.http.request(
1097            uri, "GET", headers={"accept-encoding": "identity"}
1098        )
1099        self.assertEqual(response.status, 200)
1100        self.assertEqual(response.fromcache, True)
1101
1102        (response, content) = self.http.request(
1103            uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
1104        )
1105        self.assertEqual(response.status, 200)
1106        self.assertEqual(response.fromcache, False)
1107
1108    def testGetCacheControlNoStoreRequest(self):
1109        # A no-store request means that the response should not be stored.
1110        uri = urlparse.urljoin(base, "304/test_etag.txt")
1111
1112        (response, content) = self.http.request(
1113            uri, "GET", headers={"Cache-Control": "no-store"}
1114        )
1115        self.assertEqual(response.status, 200)
1116        self.assertEqual(response.fromcache, False)
1117
1118        (response, content) = self.http.request(
1119            uri, "GET", headers={"Cache-Control": "no-store"}
1120        )
1121        self.assertEqual(response.status, 200)
1122        self.assertEqual(response.fromcache, False)
1123
1124    def testGetCacheControlNoStoreResponse(self):
1125        # A no-store response means that the response should not be stored.
1126        uri = urlparse.urljoin(base, "no-store/no-store.asis")
1127
1128        (response, content) = self.http.request(uri, "GET")
1129        self.assertEqual(response.status, 200)
1130        self.assertEqual(response.fromcache, False)
1131
1132        (response, content) = self.http.request(uri, "GET")
1133        self.assertEqual(response.status, 200)
1134        self.assertEqual(response.fromcache, False)
1135
1136    def testGetCacheControlNoCacheNoStoreRequest(self):
1137        # Test that a no-store, no-cache clears the entry from the cache
1138        # even if it was cached previously.
1139        uri = urlparse.urljoin(base, "304/test_etag.txt")
1140
1141        (response, content) = self.http.request(uri, "GET")
1142        (response, content) = self.http.request(uri, "GET")
1143        self.assertEqual(response.fromcache, True)
1144        (response, content) = self.http.request(
1145            uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1146        )
1147        (response, content) = self.http.request(
1148            uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1149        )
1150        self.assertEqual(response.status, 200)
1151        self.assertEqual(response.fromcache, False)
1152
1153    def testUpdateInvalidatesCache(self):
1154        # Test that calling PUT or DELETE on a
1155        # URI that is cache invalidates that cache.
1156        uri = urlparse.urljoin(base, "304/test_etag.txt")
1157
1158        (response, content) = self.http.request(uri, "GET")
1159        (response, content) = self.http.request(uri, "GET")
1160        self.assertEqual(response.fromcache, True)
1161        (response, content) = self.http.request(uri, "DELETE")
1162        self.assertEqual(response.status, 405)
1163
1164        (response, content) = self.http.request(uri, "GET")
1165        self.assertEqual(response.fromcache, False)
1166
1167    def testUpdateUsesCachedETag(self):
1168        # Test that we natively support http://www.w3.org/1999/04/Editing/
1169        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
1170
1171        (response, content) = self.http.request(uri, "GET")
1172        self.assertEqual(response.status, 200)
1173        self.assertEqual(response.fromcache, False)
1174        (response, content) = self.http.request(uri, "GET")
1175        self.assertEqual(response.status, 200)
1176        self.assertEqual(response.fromcache, True)
1177        (response, content) = self.http.request(uri, "PUT", body="foo")
1178        self.assertEqual(response.status, 200)
1179        (response, content) = self.http.request(uri, "PUT", body="foo")
1180        self.assertEqual(response.status, 412)
1181
1182    def testUpdatePatchUsesCachedETag(self):
1183        # Test that we natively support http://www.w3.org/1999/04/Editing/
1184        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
1185
1186        (response, content) = self.http.request(uri, "GET")
1187        self.assertEqual(response.status, 200)
1188        self.assertEqual(response.fromcache, False)
1189        (response, content) = self.http.request(uri, "GET")
1190        self.assertEqual(response.status, 200)
1191        self.assertEqual(response.fromcache, True)
1192        (response, content) = self.http.request(uri, "PATCH", body="foo")
1193        self.assertEqual(response.status, 200)
1194        (response, content) = self.http.request(uri, "PATCH", body="foo")
1195        self.assertEqual(response.status, 412)
1196
1197    def testUpdateUsesCachedETagAndOCMethod(self):
1198        # Test that we natively support http://www.w3.org/1999/04/Editing/
1199        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
1200
1201        (response, content) = self.http.request(uri, "GET")
1202        self.assertEqual(response.status, 200)
1203        self.assertEqual(response.fromcache, False)
1204        (response, content) = self.http.request(uri, "GET")
1205        self.assertEqual(response.status, 200)
1206        self.assertEqual(response.fromcache, True)
1207        self.http.optimistic_concurrency_methods.append("DELETE")
1208        (response, content) = self.http.request(uri, "DELETE")
1209        self.assertEqual(response.status, 200)
1210
1211    def testUpdateUsesCachedETagOverridden(self):
1212        # Test that we natively support http://www.w3.org/1999/04/Editing/
1213        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
1214
1215        (response, content) = self.http.request(uri, "GET")
1216        self.assertEqual(response.status, 200)
1217        self.assertEqual(response.fromcache, False)
1218        (response, content) = self.http.request(uri, "GET")
1219        self.assertEqual(response.status, 200)
1220        self.assertEqual(response.fromcache, True)
1221        (response, content) = self.http.request(
1222            uri, "PUT", body="foo", headers={"if-match": "fred"}
1223        )
1224        self.assertEqual(response.status, 412)
1225
1226    def testBasicAuth(self):
1227        # Test Basic Authentication
1228        uri = urlparse.urljoin(base, "basic/file.txt")
1229        (response, content) = self.http.request(uri, "GET")
1230        self.assertEqual(response.status, 401)
1231
1232        uri = urlparse.urljoin(base, "basic/")
1233        (response, content) = self.http.request(uri, "GET")
1234        self.assertEqual(response.status, 401)
1235
1236        self.http.add_credentials("joe", "password")
1237        (response, content) = self.http.request(uri, "GET")
1238        self.assertEqual(response.status, 200)
1239
1240        uri = urlparse.urljoin(base, "basic/file.txt")
1241        (response, content) = self.http.request(uri, "GET")
1242        self.assertEqual(response.status, 200)
1243
1244    def testBasicAuthWithDomain(self):
1245        # Test Basic Authentication
1246        uri = urlparse.urljoin(base, "basic/file.txt")
1247        (response, content) = self.http.request(uri, "GET")
1248        self.assertEqual(response.status, 401)
1249
1250        uri = urlparse.urljoin(base, "basic/")
1251        (response, content) = self.http.request(uri, "GET")
1252        self.assertEqual(response.status, 401)
1253
1254        self.http.add_credentials("joe", "password", "example.org")
1255        (response, content) = self.http.request(uri, "GET")
1256        self.assertEqual(response.status, 401)
1257
1258        uri = urlparse.urljoin(base, "basic/file.txt")
1259        (response, content) = self.http.request(uri, "GET")
1260        self.assertEqual(response.status, 401)
1261
1262        domain = urlparse.urlparse(base)[1]
1263        self.http.add_credentials("joe", "password", domain)
1264        (response, content) = self.http.request(uri, "GET")
1265        self.assertEqual(response.status, 200)
1266
1267        uri = urlparse.urljoin(base, "basic/file.txt")
1268        (response, content) = self.http.request(uri, "GET")
1269        self.assertEqual(response.status, 200)
1270
1271    def testBasicAuthTwoDifferentCredentials(self):
1272        # Test Basic Authentication with multiple sets of credentials
1273        uri = urlparse.urljoin(base, "basic2/file.txt")
1274        (response, content) = self.http.request(uri, "GET")
1275        self.assertEqual(response.status, 401)
1276
1277        uri = urlparse.urljoin(base, "basic2/")
1278        (response, content) = self.http.request(uri, "GET")
1279        self.assertEqual(response.status, 401)
1280
1281        self.http.add_credentials("fred", "barney")
1282        (response, content) = self.http.request(uri, "GET")
1283        self.assertEqual(response.status, 200)
1284
1285        uri = urlparse.urljoin(base, "basic2/file.txt")
1286        (response, content) = self.http.request(uri, "GET")
1287        self.assertEqual(response.status, 200)
1288
1289    def testBasicAuthNested(self):
1290        # Test Basic Authentication with resources
1291        # that are nested
1292        uri = urlparse.urljoin(base, "basic-nested/")
1293        (response, content) = self.http.request(uri, "GET")
1294        self.assertEqual(response.status, 401)
1295
1296        uri = urlparse.urljoin(base, "basic-nested/subdir")
1297        (response, content) = self.http.request(uri, "GET")
1298        self.assertEqual(response.status, 401)
1299
1300        # Now add in credentials one at a time and test.
1301        self.http.add_credentials("joe", "password")
1302
1303        uri = urlparse.urljoin(base, "basic-nested/")
1304        (response, content) = self.http.request(uri, "GET")
1305        self.assertEqual(response.status, 200)
1306
1307        uri = urlparse.urljoin(base, "basic-nested/subdir")
1308        (response, content) = self.http.request(uri, "GET")
1309        self.assertEqual(response.status, 401)
1310
1311        self.http.add_credentials("fred", "barney")
1312
1313        uri = urlparse.urljoin(base, "basic-nested/")
1314        (response, content) = self.http.request(uri, "GET")
1315        self.assertEqual(response.status, 200)
1316
1317        uri = urlparse.urljoin(base, "basic-nested/subdir")
1318        (response, content) = self.http.request(uri, "GET")
1319        self.assertEqual(response.status, 200)
1320
1321    def testDigestAuth(self):
1322        # Test that we support Digest Authentication
1323        uri = urlparse.urljoin(base, "digest/")
1324        (response, content) = self.http.request(uri, "GET")
1325        self.assertEqual(response.status, 401)
1326
1327        self.http.add_credentials("joe", "password")
1328        (response, content) = self.http.request(uri, "GET")
1329        self.assertEqual(response.status, 200)
1330
1331        uri = urlparse.urljoin(base, "digest/file.txt")
1332        (response, content) = self.http.request(uri, "GET")
1333
1334    def testDigestAuthNextNonceAndNC(self):
1335        # Test that if the server sets nextnonce that we reset
1336        # the nonce count back to 1
1337        uri = urlparse.urljoin(base, "digest/file.txt")
1338        self.http.add_credentials("joe", "password")
1339        (response, content) = self.http.request(
1340            uri, "GET", headers={"cache-control": "no-cache"}
1341        )
1342        info = httplib2._parse_www_authenticate(response, "authentication-info")
1343        self.assertEqual(response.status, 200)
1344        (response, content) = self.http.request(
1345            uri, "GET", headers={"cache-control": "no-cache"}
1346        )
1347        info2 = httplib2._parse_www_authenticate(response, "authentication-info")
1348        self.assertEqual(response.status, 200)
1349
1350        if "nextnonce" in info:
1351            self.assertEqual(info2["nc"], 1)
1352
1353    def testDigestAuthStale(self):
1354        # Test that we can handle a nonce becoming stale
1355        uri = urlparse.urljoin(base, "digest-expire/file.txt")
1356        self.http.add_credentials("joe", "password")
1357        (response, content) = self.http.request(
1358            uri, "GET", headers={"cache-control": "no-cache"}
1359        )
1360        info = httplib2._parse_www_authenticate(response, "authentication-info")
1361        self.assertEqual(response.status, 200)
1362
1363        time.sleep(3)
1364        # Sleep long enough that the nonce becomes stale
1365
1366        (response, content) = self.http.request(
1367            uri, "GET", headers={"cache-control": "no-cache"}
1368        )
1369        self.assertFalse(response.fromcache)
1370        self.assertTrue(response._stale_digest)
1371        info3 = httplib2._parse_www_authenticate(response, "authentication-info")
1372        self.assertEqual(response.status, 200)
1373
1374    def reflector(self, content):
1375        return dict([tuple(x.split("=", 1)) for x in content.strip().split("\n")])
1376
1377    def testReflector(self):
1378        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
1379        (response, content) = self.http.request(uri, "GET")
1380        d = self.reflector(content)
1381        self.assertTrue("HTTP_USER_AGENT" in d)
1382
1383    def testConnectionClose(self):
1384        uri = "http://www.google.com/"
1385        (response, content) = self.http.request(uri, "GET")
1386        for c in self.http.connections.values():
1387            self.assertNotEqual(None, c.sock)
1388        (response, content) = self.http.request(
1389            uri, "GET", headers={"connection": "close"}
1390        )
1391        for c in self.http.connections.values():
1392            self.assertEqual(None, c.sock)
1393
1394    def testPickleHttp(self):
1395        pickled_http = pickle.dumps(self.http)
1396        new_http = pickle.loads(pickled_http)
1397
1398        self.assertEqual(
1399            sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
1400        )
1401        for key in new_http.__dict__:
1402            if key in ("certificates", "credentials"):
1403                self.assertEqual(
1404                    new_http.__dict__[key].credentials,
1405                    self.http.__dict__[key].credentials,
1406                )
1407            elif key == "cache":
1408                self.assertEqual(
1409                    new_http.__dict__[key].cache, self.http.__dict__[key].cache
1410                )
1411            else:
1412                self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
1413
1414    def testPickleHttpWithConnection(self):
1415        self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
1416        pickled_http = pickle.dumps(self.http)
1417        new_http = pickle.loads(pickled_http)
1418
1419        self.assertEqual(self.http.connections.keys(), ["http:bitworking.org"])
1420        self.assertEqual(new_http.connections, {})
1421
1422    def testPickleCustomRequestHttp(self):
1423        def dummy_request(*args, **kwargs):
1424            return new_request(*args, **kwargs)
1425
1426        dummy_request.dummy_attr = "dummy_value"
1427
1428        self.http.request = dummy_request
1429        pickled_http = pickle.dumps(self.http)
1430        self.assertFalse("S'request'" in pickled_http)
1431
1432
1433try:
1434    import memcache
1435
1436    class HttpTestMemCached(HttpTest):
1437        def setUp(self):
1438            self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
1439            # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1440            self.http = httplib2.Http(self.cache)
1441            self.cache.flush_all()
1442            # Not exactly sure why the sleep is needed here, but
1443            # if not present then some unit tests that rely on caching
1444            # fail. Memcached seems to lose some sets immediately
1445            # after a flush_all if the set is to a value that
1446            # was previously cached. (Maybe the flush is handled async?)
1447            time.sleep(1)
1448            self.http.clear_credentials()
1449
1450
1451except:
1452    pass
1453
1454# ------------------------------------------------------------------------
1455
1456
1457class HttpPrivateTest(unittest.TestCase):
1458    def testParseCacheControl(self):
1459        # Test that we can parse the Cache-Control header
1460        self.assertEqual({}, httplib2._parse_cache_control({}))
1461        self.assertEqual(
1462            {"no-cache": 1},
1463            httplib2._parse_cache_control({"cache-control": " no-cache"}),
1464        )
1465        cc = httplib2._parse_cache_control(
1466            {"cache-control": " no-cache, max-age = 7200"}
1467        )
1468        self.assertEqual(cc["no-cache"], 1)
1469        self.assertEqual(cc["max-age"], "7200")
1470        cc = httplib2._parse_cache_control({"cache-control": " , "})
1471        self.assertEqual(cc[""], 1)
1472
1473        try:
1474            cc = httplib2._parse_cache_control(
1475                {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
1476            )
1477            self.assertTrue("max-age" in cc)
1478        except:
1479            self.fail("Should not throw exception")
1480
1481    def testNormalizeHeaders(self):
1482        # Test that we normalize headers to lowercase
1483        h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
1484        self.assertTrue("cache-control" in h)
1485        self.assertTrue("other" in h)
1486        self.assertEqual("Stuff", h["other"])
1487
1488    def testExpirationModelTransparent(self):
1489        # Test that no-cache makes our request TRANSPARENT
1490        response_headers = {"cache-control": "max-age=7200"}
1491        request_headers = {"cache-control": "no-cache"}
1492        self.assertEqual(
1493            "TRANSPARENT",
1494            httplib2._entry_disposition(response_headers, request_headers),
1495        )
1496
1497    def testMaxAgeNonNumeric(self):
1498        # Test that no-cache makes our request TRANSPARENT
1499        response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
1500        request_headers = {}
1501        self.assertEqual(
1502            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1503        )
1504
1505    def testExpirationModelNoCacheResponse(self):
1506        # The date and expires point to an entry that should be
1507        # FRESH, but the no-cache over-rides that.
1508        now = time.time()
1509        response_headers = {
1510            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1511            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1512            "cache-control": "no-cache",
1513        }
1514        request_headers = {}
1515        self.assertEqual(
1516            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1517        )
1518
1519    def testExpirationModelStaleRequestMustReval(self):
1520        # must-revalidate forces STALE
1521        self.assertEqual(
1522            "STALE",
1523            httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
1524        )
1525
1526    def testExpirationModelStaleResponseMustReval(self):
1527        # must-revalidate forces STALE
1528        self.assertEqual(
1529            "STALE",
1530            httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
1531        )
1532
1533    def testExpirationModelFresh(self):
1534        response_headers = {
1535            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1536            "cache-control": "max-age=2",
1537        }
1538        request_headers = {}
1539        self.assertEqual(
1540            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1541        )
1542        time.sleep(3)
1543        self.assertEqual(
1544            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1545        )
1546
1547    def testExpirationMaxAge0(self):
1548        response_headers = {
1549            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1550            "cache-control": "max-age=0",
1551        }
1552        request_headers = {}
1553        self.assertEqual(
1554            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1555        )
1556
1557    def testExpirationModelDateAndExpires(self):
1558        now = time.time()
1559        response_headers = {
1560            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1561            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1562        }
1563        request_headers = {}
1564        self.assertEqual(
1565            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1566        )
1567        time.sleep(3)
1568        self.assertEqual(
1569            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1570        )
1571
1572    def testExpiresZero(self):
1573        now = time.time()
1574        response_headers = {
1575            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1576            "expires": "0",
1577        }
1578        request_headers = {}
1579        self.assertEqual(
1580            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1581        )
1582
1583    def testExpirationModelDateOnly(self):
1584        now = time.time()
1585        response_headers = {
1586            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
1587        }
1588        request_headers = {}
1589        self.assertEqual(
1590            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1591        )
1592
1593    def testExpirationModelOnlyIfCached(self):
1594        response_headers = {}
1595        request_headers = {"cache-control": "only-if-cached"}
1596        self.assertEqual(
1597            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1598        )
1599
1600    def testExpirationModelMaxAgeBoth(self):
1601        now = time.time()
1602        response_headers = {
1603            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1604            "cache-control": "max-age=2",
1605        }
1606        request_headers = {"cache-control": "max-age=0"}
1607        self.assertEqual(
1608            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1609        )
1610
1611    def testExpirationModelDateAndExpiresMinFresh1(self):
1612        now = time.time()
1613        response_headers = {
1614            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1615            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1616        }
1617        request_headers = {"cache-control": "min-fresh=2"}
1618        self.assertEqual(
1619            "STALE", httplib2._entry_disposition(response_headers, request_headers)
1620        )
1621
1622    def testExpirationModelDateAndExpiresMinFresh2(self):
1623        now = time.time()
1624        response_headers = {
1625            "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1626            "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1627        }
1628        request_headers = {"cache-control": "min-fresh=2"}
1629        self.assertEqual(
1630            "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1631        )
1632
1633    def testParseWWWAuthenticateEmpty(self):
1634        res = httplib2._parse_www_authenticate({})
1635        self.assertEqual(len(res.keys()), 0)
1636
1637    def testParseWWWAuthenticate(self):
1638        # different uses of spaces around commas
1639        res = httplib2._parse_www_authenticate(
1640            {
1641                "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
1642            }
1643        )
1644        self.assertEqual(len(res.keys()), 1)
1645        self.assertEqual(len(res["test"].keys()), 5)
1646
1647        # tokens with non-alphanum
1648        res = httplib2._parse_www_authenticate(
1649            {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
1650        )
1651        self.assertEqual(len(res.keys()), 1)
1652        self.assertEqual(len(res["t*!%#st"].keys()), 2)
1653
1654        # quoted string with quoted pairs
1655        res = httplib2._parse_www_authenticate(
1656            {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
1657        )
1658        self.assertEqual(len(res.keys()), 1)
1659        self.assertEqual(res["test"]["realm"], 'a "test" realm')
1660
1661    def testParseWWWAuthenticateStrict(self):
1662        httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
1663        self.testParseWWWAuthenticate()
1664        httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
1665
1666    def testParseWWWAuthenticateBasic(self):
1667        res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
1668        basic = res["basic"]
1669        self.assertEqual("me", basic["realm"])
1670
1671        res = httplib2._parse_www_authenticate(
1672            {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
1673        )
1674        basic = res["basic"]
1675        self.assertEqual("me", basic["realm"])
1676        self.assertEqual("MD5", basic["algorithm"])
1677
1678        res = httplib2._parse_www_authenticate(
1679            {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
1680        )
1681        basic = res["basic"]
1682        self.assertEqual("me", basic["realm"])
1683        self.assertEqual("MD5", basic["algorithm"])
1684
1685    def testParseWWWAuthenticateBasic2(self):
1686        res = httplib2._parse_www_authenticate(
1687            {"www-authenticate": 'Basic realm="me",other="fred" '}
1688        )
1689        basic = res["basic"]
1690        self.assertEqual("me", basic["realm"])
1691        self.assertEqual("fred", basic["other"])
1692
1693    def testParseWWWAuthenticateBasic3(self):
1694        res = httplib2._parse_www_authenticate(
1695            {"www-authenticate": 'Basic REAlm="me" '}
1696        )
1697        basic = res["basic"]
1698        self.assertEqual("me", basic["realm"])
1699
1700    def testParseWWWAuthenticateDigest(self):
1701        res = httplib2._parse_www_authenticate(
1702            {
1703                "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
1704            }
1705        )
1706        digest = res["digest"]
1707        self.assertEqual("testrealm@host.com", digest["realm"])
1708        self.assertEqual("auth,auth-int", digest["qop"])
1709
1710    def testParseWWWAuthenticateMultiple(self):
1711        res = httplib2._parse_www_authenticate(
1712            {
1713                "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
1714            }
1715        )
1716        digest = res["digest"]
1717        self.assertEqual("testrealm@host.com", digest["realm"])
1718        self.assertEqual("auth,auth-int", digest["qop"])
1719        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1720        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1721        basic = res["basic"]
1722        self.assertEqual("me", basic["realm"])
1723
1724    def testParseWWWAuthenticateMultiple2(self):
1725        # Handle an added comma between challenges, which might get thrown in if the challenges were
1726        # originally sent in separate www-authenticate headers.
1727        res = httplib2._parse_www_authenticate(
1728            {
1729                "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
1730            }
1731        )
1732        digest = res["digest"]
1733        self.assertEqual("testrealm@host.com", digest["realm"])
1734        self.assertEqual("auth,auth-int", digest["qop"])
1735        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1736        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1737        basic = res["basic"]
1738        self.assertEqual("me", basic["realm"])
1739
1740    def testParseWWWAuthenticateMultiple3(self):
1741        # Handle an added comma between challenges, which might get thrown in if the challenges were
1742        # originally sent in separate www-authenticate headers.
1743        res = httplib2._parse_www_authenticate(
1744            {
1745                "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1746            }
1747        )
1748        digest = res["digest"]
1749        self.assertEqual("testrealm@host.com", digest["realm"])
1750        self.assertEqual("auth,auth-int", digest["qop"])
1751        self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1752        self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1753        basic = res["basic"]
1754        self.assertEqual("me", basic["realm"])
1755        wsse = res["wsse"]
1756        self.assertEqual("foo", wsse["realm"])
1757        self.assertEqual("UsernameToken", wsse["profile"])
1758
1759    def testParseWWWAuthenticateMultiple4(self):
1760        res = httplib2._parse_www_authenticate(
1761            {
1762                "www-authenticate": 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1763            }
1764        )
1765        digest = res["digest"]
1766        self.assertEqual("test-real.m@host.com", digest["realm"])
1767        self.assertEqual("\tauth,auth-int", digest["qop"])
1768        self.assertEqual("(*)&^&$%#", digest["nonce"])
1769
1770    def testParseWWWAuthenticateMoreQuoteCombos(self):
1771        res = httplib2._parse_www_authenticate(
1772            {
1773                "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1774            }
1775        )
1776        digest = res["digest"]
1777        self.assertEqual("myrealm", digest["realm"])
1778
1779    def testParseWWWAuthenticateMalformed(self):
1780        try:
1781            res = httplib2._parse_www_authenticate(
1782                {
1783                    "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
1784                }
1785            )
1786            self.fail("should raise an exception")
1787        except httplib2.MalformedHeader:
1788            pass
1789
1790    def testDigestObject(self):
1791        credentials = ("joe", "password")
1792        host = None
1793        request_uri = "/projects/httplib2/test/digest/"
1794        headers = {}
1795        response = {
1796            "www-authenticate": 'Digest realm="myrealm", '
1797            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1798            'algorithm=MD5, qop="auth"'
1799        }
1800        content = ""
1801
1802        d = httplib2.DigestAuthentication(
1803            credentials, host, request_uri, headers, response, content, None
1804        )
1805        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1806        our_request = "authorization: %s" % headers["authorization"]
1807        working_request = (
1808            'authorization: Digest username="joe", realm="myrealm", '
1809            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1810            ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1811            'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1812            'nc=00000001, cnonce="33033375ec278a46"'
1813        )
1814        self.assertEqual(our_request, working_request)
1815
1816    def testDigestObjectWithOpaque(self):
1817        credentials = ("joe", "password")
1818        host = None
1819        request_uri = "/projects/httplib2/test/digest/"
1820        headers = {}
1821        response = {
1822            "www-authenticate": 'Digest realm="myrealm", '
1823            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1824            'algorithm=MD5, qop="auth", opaque="atestopaque"'
1825        }
1826        content = ""
1827
1828        d = httplib2.DigestAuthentication(
1829            credentials, host, request_uri, headers, response, content, None
1830        )
1831        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1832        our_request = "authorization: %s" % headers["authorization"]
1833        working_request = (
1834            'authorization: Digest username="joe", realm="myrealm", '
1835            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1836            ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1837            'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1838            'nc=00000001, cnonce="33033375ec278a46", '
1839            'opaque="atestopaque"'
1840        )
1841        self.assertEqual(our_request, working_request)
1842
1843    def testDigestObjectStale(self):
1844        credentials = ("joe", "password")
1845        host = None
1846        request_uri = "/projects/httplib2/test/digest/"
1847        headers = {}
1848        response = httplib2.Response({})
1849        response["www-authenticate"] = (
1850            'Digest realm="myrealm", '
1851            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1852            ' algorithm=MD5, qop="auth", stale=true'
1853        )
1854        response.status = 401
1855        content = ""
1856        d = httplib2.DigestAuthentication(
1857            credentials, host, request_uri, headers, response, content, None
1858        )
1859        # Returns true to force a retry
1860        self.assertTrue(d.response(response, content))
1861
1862    def testDigestObjectAuthInfo(self):
1863        credentials = ("joe", "password")
1864        host = None
1865        request_uri = "/projects/httplib2/test/digest/"
1866        headers = {}
1867        response = httplib2.Response({})
1868        response["www-authenticate"] = (
1869            'Digest realm="myrealm", '
1870            'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1871            ' algorithm=MD5, qop="auth", stale=true'
1872        )
1873        response["authentication-info"] = 'nextnonce="fred"'
1874        content = ""
1875        d = httplib2.DigestAuthentication(
1876            credentials, host, request_uri, headers, response, content, None
1877        )
1878        # Returns true to force a retry
1879        self.assertFalse(d.response(response, content))
1880        self.assertEqual("fred", d.challenge["nonce"])
1881        self.assertEqual(1, d.challenge["nc"])
1882
1883    def testWsseAlgorithm(self):
1884        digest = httplib2._wsse_username_token(
1885            "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
1886        )
1887        expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1888        self.assertEqual(expected, digest)
1889
1890    def testEnd2End(self):
1891        # one end to end header
1892        response = {"content-type": "application/atom+xml", "te": "deflate"}
1893        end2end = httplib2._get_end2end_headers(response)
1894        self.assertTrue("content-type" in end2end)
1895        self.assertTrue("te" not in end2end)
1896        self.assertTrue("connection" not in end2end)
1897
1898        # one end to end header that gets eliminated
1899        response = {
1900            "connection": "content-type",
1901            "content-type": "application/atom+xml",
1902            "te": "deflate",
1903        }
1904        end2end = httplib2._get_end2end_headers(response)
1905        self.assertTrue("content-type" not in end2end)
1906        self.assertTrue("te" not in end2end)
1907        self.assertTrue("connection" not in end2end)
1908
1909        # Degenerate case of no headers
1910        response = {}
1911        end2end = httplib2._get_end2end_headers(response)
1912        self.assertEquals(0, len(end2end))
1913
1914        # Degenerate case of connection referrring to a header not passed in
1915        response = {"connection": "content-type"}
1916        end2end = httplib2._get_end2end_headers(response)
1917        self.assertEquals(0, len(end2end))
1918
1919
1920class TestProxyInfo(unittest.TestCase):
1921    def setUp(self):
1922        self.orig_env = dict(os.environ)
1923
1924    def tearDown(self):
1925        os.environ.clear()
1926        os.environ.update(self.orig_env)
1927
1928    def test_from_url(self):
1929        pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
1930        self.assertEquals(pi.proxy_host, "myproxy.example.com")
1931        self.assertEquals(pi.proxy_port, 80)
1932        self.assertEquals(pi.proxy_user, None)
1933
1934    def test_from_url_ident(self):
1935        pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
1936        self.assertEquals(pi.proxy_host, "someproxy")
1937        self.assertEquals(pi.proxy_port, 99)
1938        self.assertEquals(pi.proxy_user, "zoidberg")
1939        self.assertEquals(pi.proxy_pass, "fish")
1940
1941    def test_from_env(self):
1942        os.environ["http_proxy"] = "http://myproxy.example.com:8080"
1943        pi = httplib2.proxy_info_from_environment()
1944        self.assertEquals(pi.proxy_host, "myproxy.example.com")
1945        self.assertEquals(pi.proxy_port, 8080)
1946        self.assertEquals(pi.bypass_hosts, [])
1947
1948    def test_from_env_no_proxy(self):
1949        os.environ["http_proxy"] = "http://myproxy.example.com:80"
1950        os.environ["https_proxy"] = "http://myproxy.example.com:81"
1951        os.environ["no_proxy"] = "localhost,otherhost.domain.local"
1952        pi = httplib2.proxy_info_from_environment("https")
1953        self.assertEquals(pi.proxy_host, "myproxy.example.com")
1954        self.assertEquals(pi.proxy_port, 81)
1955        self.assertEquals(pi.bypass_hosts, ["localhost", "otherhost.domain.local"])
1956
1957    def test_from_env_none(self):
1958        os.environ.clear()
1959        pi = httplib2.proxy_info_from_environment()
1960        self.assertEquals(pi, None)
1961
1962    def test_applies_to(self):
1963        os.environ["http_proxy"] = "http://myproxy.example.com:80"
1964        os.environ["https_proxy"] = "http://myproxy.example.com:81"
1965        os.environ["no_proxy"] = "localhost,otherhost.domain.local,example.com"
1966        pi = httplib2.proxy_info_from_environment()
1967        self.assertFalse(pi.applies_to("localhost"))
1968        self.assertTrue(pi.applies_to("www.google.com"))
1969        self.assertFalse(pi.applies_to("www.example.com"))
1970
1971    def test_no_proxy_star(self):
1972        os.environ["http_proxy"] = "http://myproxy.example.com:80"
1973        os.environ["NO_PROXY"] = "*"
1974        pi = httplib2.proxy_info_from_environment()
1975        for host in ("localhost", "169.254.38.192", "www.google.com"):
1976            self.assertFalse(pi.applies_to(host))
1977
1978    def test_proxy_headers(self):
1979        headers = {"key0": "val0", "key1": "val1"}
1980        pi = httplib2.ProxyInfo(
1981            httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
1982        )
1983        self.assertEquals(pi.proxy_headers, headers)
1984
1985
1986if __name__ == "__main__":
1987    unittest.main()
1988