1# (c) 2005 Ian Bicking, Clark C. Evans and contributors 2# This module is part of the Python Paste Project and is released under 3# the MIT License: http://www.opensource.org/licenses/mit-license.php 4import time 5import random 6import os 7import tempfile 8try: 9 # Python 3 10 from email.utils import parsedate_tz, mktime_tz 11except ImportError: 12 # Python 2 13 from rfc822 import parsedate_tz, mktime_tz 14import six 15 16from paste import fileapp 17from paste.fileapp import * 18from paste.fixture import * 19 20# NOTE(haypo): don't use string.letters because the order of lower and upper 21# case letters changes when locale.setlocale() is called for the first time 22LETTERS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' 23 24def test_data(): 25 harness = TestApp(DataApp(b'mycontent')) 26 res = harness.get("/") 27 assert 'application/octet-stream' == res.header('content-type') 28 assert '9' == res.header('content-length') 29 assert "<Response 200 OK 'mycontent'>" == repr(res) 30 harness.app.set_content(b"bingles") 31 assert "<Response 200 OK 'bingles'>" == repr(harness.get("/")) 32 33def test_cache(): 34 def build(*args,**kwargs): 35 app = DataApp(b"SomeContent") 36 app.cache_control(*args,**kwargs) 37 return TestApp(app).get("/") 38 res = build() 39 assert 'public' == res.header('cache-control') 40 assert not res.header('expires',None) 41 res = build(private=True) 42 assert 'private' == res.header('cache-control') 43 assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() 44 res = build(no_cache=True) 45 assert 'no-cache' == res.header('cache-control') 46 assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() 47 res = build(max_age=60,s_maxage=30) 48 assert 'public, max-age=60, s-maxage=30' == res.header('cache-control') 49 expires = mktime_tz(parsedate_tz(res.header('expires'))) 50 assert expires > time.time()+58 and expires < time.time()+61 51 res = build(private=True, max_age=60, no_transform=True, no_store=True) 52 assert 'private, no-store, no-transform, max-age=60' == \ 53 res.header('cache-control') 54 expires = mktime_tz(parsedate_tz(res.header('expires'))) 55 assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time() 56 57def test_disposition(): 58 def build(*args,**kwargs): 59 app = DataApp(b"SomeContent") 60 app.content_disposition(*args,**kwargs) 61 return TestApp(app).get("/") 62 res = build() 63 assert 'attachment' == res.header('content-disposition') 64 assert 'application/octet-stream' == res.header('content-type') 65 res = build(filename="bing.txt") 66 assert 'attachment; filename="bing.txt"' == \ 67 res.header('content-disposition') 68 assert 'text/plain' == res.header('content-type') 69 res = build(inline=True) 70 assert 'inline' == res.header('content-disposition') 71 assert 'application/octet-stream' == res.header('content-type') 72 res = build(inline=True, filename="/some/path/bing.txt") 73 assert 'inline; filename="bing.txt"' == \ 74 res.header('content-disposition') 75 assert 'text/plain' == res.header('content-type') 76 try: 77 res = build(inline=True,attachment=True) 78 except AssertionError: 79 pass 80 else: 81 assert False, "should be an exception" 82 83def test_modified(): 84 harness = TestApp(DataApp(b'mycontent')) 85 res = harness.get("/") 86 assert "<Response 200 OK 'mycontent'>" == repr(res) 87 last_modified = res.header('last-modified') 88 res = harness.get("/",headers={'if-modified-since': last_modified}) 89 assert "<Response 304 Not Modified ''>" == repr(res) 90 res = harness.get("/",headers={'if-modified-since': last_modified + \ 91 '; length=1506'}) 92 assert "<Response 304 Not Modified ''>" == repr(res) 93 res = harness.get("/",status=400, 94 headers={'if-modified-since': 'garbage'}) 95 assert 400 == res.status and b"ill-formed timestamp" in res.body 96 res = harness.get("/",status=400, 97 headers={'if-modified-since': 98 'Thu, 22 Dec 2030 01:01:01 GMT'}) 99 assert 400 == res.status and b"check your system clock" in res.body 100 101def test_file(): 102 tempfile = "test_fileapp.%s.txt" % (random.random()) 103 content = LETTERS * 20 104 if six.PY3: 105 content = content.encode('utf8') 106 with open(tempfile, "wb") as fp: 107 fp.write(content) 108 try: 109 app = fileapp.FileApp(tempfile) 110 res = TestApp(app).get("/") 111 assert len(content) == int(res.header('content-length')) 112 assert 'text/plain' == res.header('content-type') 113 assert content == res.body 114 assert content == app.content # this is cashed 115 lastmod = res.header('last-modified') 116 print("updating", tempfile) 117 file = open(tempfile,"a+") 118 file.write("0123456789") 119 file.close() 120 res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'}) 121 assert len(content)+10 == int(res.header('content-length')) 122 assert 'text/plain' == res.header('content-type') 123 assert content + b"0123456789" == res.body 124 assert app.content # we are still cached 125 file = open(tempfile,"a+") 126 file.write("X" * fileapp.CACHE_SIZE) # exceed the cashe size 127 file.write("YZ") 128 file.close() 129 res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'}) 130 newsize = fileapp.CACHE_SIZE + len(content)+12 131 assert newsize == int(res.header('content-length')) 132 assert newsize == len(res.body) 133 assert res.body.startswith(content) and res.body.endswith(b'XYZ') 134 assert not app.content # we are no longer cached 135 finally: 136 os.unlink(tempfile) 137 138def test_dir(): 139 tmpdir = tempfile.mkdtemp() 140 try: 141 tmpfile = os.path.join(tmpdir, 'file') 142 tmpsubdir = os.path.join(tmpdir, 'dir') 143 fp = open(tmpfile, 'w') 144 fp.write('abcd') 145 fp.close() 146 os.mkdir(tmpsubdir) 147 try: 148 app = fileapp.DirectoryApp(tmpdir) 149 for path in ['/', '', '//', '/..', '/.', '/../..']: 150 assert TestApp(app).get(path, status=403).status == 403, ValueError(path) 151 for path in ['/~', '/foo', '/dir', '/dir/']: 152 assert TestApp(app).get(path, status=404).status == 404, ValueError(path) 153 assert TestApp(app).get('/file').body == b'abcd' 154 finally: 155 os.remove(tmpfile) 156 os.rmdir(tmpsubdir) 157 finally: 158 os.rmdir(tmpdir) 159 160def _excercize_range(build,content): 161 # full content request, but using ranges' 162 res = build("bytes=0-%d" % (len(content)-1)) 163 assert res.header('accept-ranges') == 'bytes' 164 assert res.body == content 165 assert res.header('content-length') == str(len(content)) 166 res = build("bytes=-%d" % (len(content)-1)) 167 assert res.body == content 168 assert res.header('content-length') == str(len(content)) 169 res = build("bytes=0-") 170 assert res.body == content 171 assert res.header('content-length') == str(len(content)) 172 # partial content requests 173 res = build("bytes=0-9", status=206) 174 assert res.body == content[:10] 175 assert res.header('content-length') == '10' 176 res = build("bytes=%d-" % (len(content)-1), status=206) 177 assert res.body == b'Z' 178 assert res.header('content-length') == '1' 179 res = build("bytes=%d-%d" % (3,17), status=206) 180 assert res.body == content[3:18] 181 assert res.header('content-length') == '15' 182 183def test_range(): 184 content = LETTERS * 5 185 if six.PY3: 186 content = content.encode('utf8') 187 def build(range, status=206): 188 app = DataApp(content) 189 return TestApp(app).get("/",headers={'Range': range}, status=status) 190 _excercize_range(build,content) 191 build('bytes=0-%d' % (len(content)+1), 416) 192 193def test_file_range(): 194 tempfile = "test_fileapp.%s.txt" % (random.random()) 195 content = LETTERS * (1+(fileapp.CACHE_SIZE // len(LETTERS))) 196 if six.PY3: 197 content = content.encode('utf8') 198 assert len(content) > fileapp.CACHE_SIZE 199 with open(tempfile, "wb") as fp: 200 fp.write(content) 201 try: 202 def build(range, status=206): 203 app = fileapp.FileApp(tempfile) 204 return TestApp(app).get("/",headers={'Range': range}, 205 status=status) 206 _excercize_range(build,content) 207 for size in (13,len(LETTERS), len(LETTERS)-1): 208 fileapp.BLOCK_SIZE = size 209 _excercize_range(build,content) 210 finally: 211 os.unlink(tempfile) 212 213def test_file_cache(): 214 filename = os.path.join(os.path.dirname(__file__), 215 'urlparser_data', 'secured.txt') 216 app = TestApp(fileapp.FileApp(filename)) 217 res = app.get('/') 218 etag = res.header('ETag') 219 last_mod = res.header('Last-Modified') 220 res = app.get('/', headers={'If-Modified-Since': last_mod}, 221 status=304) 222 res = app.get('/', headers={'If-None-Match': etag}, 223 status=304) 224 res = app.get('/', headers={'If-None-Match': 'asdf'}, 225 status=200) 226 res = app.get('/', headers={'If-Modified-Since': 'Sat, 1 Jan 2005 12:00:00 GMT'}, 227 status=200) 228 res = app.get('/', headers={'If-Modified-Since': last_mod + '; length=100'}, 229 status=304) 230 res = app.get('/', headers={'If-Modified-Since': 'invalid date'}, 231 status=400) 232 233def test_methods(): 234 filename = os.path.join(os.path.dirname(__file__), 235 'urlparser_data', 'secured.txt') 236 app = TestApp(fileapp.FileApp(filename)) 237 get_res = app.get('') 238 res = app.get('', extra_environ={'REQUEST_METHOD': 'HEAD'}) 239 assert res.headers == get_res.headers 240 assert not res.body 241 app.post('', status=405) # Method Not Allowed 242 243