1# (c) 2005 Ian Bicking, Clark C. Evans and contributors
2# This module is part of the Python Paste Project and is released under
3# the MIT License: http://www.opensource.org/licenses/mit-license.php
4import time
5import random
6import os
7import tempfile
8try:
9    # Python 3
10    from email.utils import parsedate_tz, mktime_tz
11except ImportError:
12    # Python 2
13    from rfc822 import parsedate_tz, mktime_tz
14import six
15
16from paste import fileapp
17from paste.fileapp import *
18from paste.fixture import *
19
20# NOTE(haypo): don't use string.letters because the order of lower and upper
21# case letters changes when locale.setlocale() is called for the first time
22LETTERS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
23
24def test_data():
25    harness = TestApp(DataApp(b'mycontent'))
26    res = harness.get("/")
27    assert 'application/octet-stream' == res.header('content-type')
28    assert '9' == res.header('content-length')
29    assert "<Response 200 OK 'mycontent'>" == repr(res)
30    harness.app.set_content(b"bingles")
31    assert "<Response 200 OK 'bingles'>" == repr(harness.get("/"))
32
33def test_cache():
34    def build(*args,**kwargs):
35        app = DataApp(b"SomeContent")
36        app.cache_control(*args,**kwargs)
37        return TestApp(app).get("/")
38    res = build()
39    assert 'public' == res.header('cache-control')
40    assert not res.header('expires',None)
41    res = build(private=True)
42    assert 'private' == res.header('cache-control')
43    assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
44    res = build(no_cache=True)
45    assert 'no-cache' == res.header('cache-control')
46    assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
47    res = build(max_age=60,s_maxage=30)
48    assert 'public, max-age=60, s-maxage=30' == res.header('cache-control')
49    expires = mktime_tz(parsedate_tz(res.header('expires')))
50    assert expires > time.time()+58 and expires < time.time()+61
51    res = build(private=True, max_age=60, no_transform=True, no_store=True)
52    assert 'private, no-store, no-transform, max-age=60' == \
53           res.header('cache-control')
54    expires = mktime_tz(parsedate_tz(res.header('expires')))
55    assert mktime_tz(parsedate_tz(res.header('expires'))) < time.time()
56
57def test_disposition():
58    def build(*args,**kwargs):
59        app = DataApp(b"SomeContent")
60        app.content_disposition(*args,**kwargs)
61        return TestApp(app).get("/")
62    res = build()
63    assert 'attachment' == res.header('content-disposition')
64    assert 'application/octet-stream' == res.header('content-type')
65    res = build(filename="bing.txt")
66    assert 'attachment; filename="bing.txt"' == \
67            res.header('content-disposition')
68    assert 'text/plain' == res.header('content-type')
69    res = build(inline=True)
70    assert 'inline' == res.header('content-disposition')
71    assert 'application/octet-stream' == res.header('content-type')
72    res = build(inline=True, filename="/some/path/bing.txt")
73    assert 'inline; filename="bing.txt"' == \
74            res.header('content-disposition')
75    assert 'text/plain' == res.header('content-type')
76    try:
77       res = build(inline=True,attachment=True)
78    except AssertionError:
79        pass
80    else:
81        assert False, "should be an exception"
82
83def test_modified():
84    harness = TestApp(DataApp(b'mycontent'))
85    res = harness.get("/")
86    assert "<Response 200 OK 'mycontent'>" == repr(res)
87    last_modified = res.header('last-modified')
88    res = harness.get("/",headers={'if-modified-since': last_modified})
89    assert "<Response 304 Not Modified ''>" == repr(res)
90    res = harness.get("/",headers={'if-modified-since': last_modified + \
91                                   '; length=1506'})
92    assert "<Response 304 Not Modified ''>" == repr(res)
93    res = harness.get("/",status=400,
94            headers={'if-modified-since': 'garbage'})
95    assert 400 == res.status and b"ill-formed timestamp" in res.body
96    res = harness.get("/",status=400,
97            headers={'if-modified-since':
98                'Thu, 22 Dec 2030 01:01:01 GMT'})
99    assert 400 == res.status and b"check your system clock" in res.body
100
101def test_file():
102    tempfile = "test_fileapp.%s.txt" % (random.random())
103    content = LETTERS * 20
104    if six.PY3:
105        content = content.encode('utf8')
106    with open(tempfile, "wb") as fp:
107        fp.write(content)
108    try:
109        app = fileapp.FileApp(tempfile)
110        res = TestApp(app).get("/")
111        assert len(content) == int(res.header('content-length'))
112        assert 'text/plain' == res.header('content-type')
113        assert content == res.body
114        assert content == app.content  # this is cashed
115        lastmod = res.header('last-modified')
116        print("updating", tempfile)
117        file = open(tempfile,"a+")
118        file.write("0123456789")
119        file.close()
120        res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'})
121        assert len(content)+10 == int(res.header('content-length'))
122        assert 'text/plain' == res.header('content-type')
123        assert content + b"0123456789" == res.body
124        assert app.content # we are still cached
125        file = open(tempfile,"a+")
126        file.write("X" * fileapp.CACHE_SIZE) # exceed the cashe size
127        file.write("YZ")
128        file.close()
129        res = TestApp(app).get("/",headers={'Cache-Control': 'max-age=0'})
130        newsize = fileapp.CACHE_SIZE + len(content)+12
131        assert newsize == int(res.header('content-length'))
132        assert newsize == len(res.body)
133        assert res.body.startswith(content) and res.body.endswith(b'XYZ')
134        assert not app.content # we are no longer cached
135    finally:
136        os.unlink(tempfile)
137
138def test_dir():
139    tmpdir = tempfile.mkdtemp()
140    try:
141        tmpfile = os.path.join(tmpdir, 'file')
142        tmpsubdir = os.path.join(tmpdir, 'dir')
143        fp = open(tmpfile, 'w')
144        fp.write('abcd')
145        fp.close()
146        os.mkdir(tmpsubdir)
147        try:
148            app = fileapp.DirectoryApp(tmpdir)
149            for path in ['/', '', '//', '/..', '/.', '/../..']:
150                assert TestApp(app).get(path, status=403).status == 403, ValueError(path)
151            for path in ['/~', '/foo', '/dir', '/dir/']:
152                assert TestApp(app).get(path, status=404).status == 404, ValueError(path)
153            assert TestApp(app).get('/file').body == b'abcd'
154        finally:
155            os.remove(tmpfile)
156            os.rmdir(tmpsubdir)
157    finally:
158        os.rmdir(tmpdir)
159
160def _excercize_range(build,content):
161    # full content request, but using ranges'
162    res = build("bytes=0-%d" % (len(content)-1))
163    assert res.header('accept-ranges') == 'bytes'
164    assert res.body == content
165    assert res.header('content-length') == str(len(content))
166    res = build("bytes=-%d" % (len(content)-1))
167    assert res.body == content
168    assert res.header('content-length') == str(len(content))
169    res = build("bytes=0-")
170    assert res.body == content
171    assert res.header('content-length') == str(len(content))
172    # partial content requests
173    res = build("bytes=0-9", status=206)
174    assert res.body == content[:10]
175    assert res.header('content-length') == '10'
176    res = build("bytes=%d-" % (len(content)-1), status=206)
177    assert res.body == b'Z'
178    assert res.header('content-length') == '1'
179    res = build("bytes=%d-%d" % (3,17), status=206)
180    assert res.body == content[3:18]
181    assert res.header('content-length') == '15'
182
183def test_range():
184    content = LETTERS * 5
185    if six.PY3:
186        content = content.encode('utf8')
187    def build(range, status=206):
188        app = DataApp(content)
189        return TestApp(app).get("/",headers={'Range': range}, status=status)
190    _excercize_range(build,content)
191    build('bytes=0-%d' % (len(content)+1), 416)
192
193def test_file_range():
194    tempfile = "test_fileapp.%s.txt" % (random.random())
195    content = LETTERS * (1+(fileapp.CACHE_SIZE // len(LETTERS)))
196    if six.PY3:
197        content = content.encode('utf8')
198    assert len(content) > fileapp.CACHE_SIZE
199    with open(tempfile, "wb") as fp:
200        fp.write(content)
201    try:
202        def build(range, status=206):
203            app = fileapp.FileApp(tempfile)
204            return TestApp(app).get("/",headers={'Range': range},
205                                        status=status)
206        _excercize_range(build,content)
207        for size in (13,len(LETTERS), len(LETTERS)-1):
208            fileapp.BLOCK_SIZE = size
209            _excercize_range(build,content)
210    finally:
211        os.unlink(tempfile)
212
213def test_file_cache():
214    filename = os.path.join(os.path.dirname(__file__),
215                            'urlparser_data', 'secured.txt')
216    app = TestApp(fileapp.FileApp(filename))
217    res = app.get('/')
218    etag = res.header('ETag')
219    last_mod = res.header('Last-Modified')
220    res = app.get('/', headers={'If-Modified-Since': last_mod},
221                  status=304)
222    res = app.get('/', headers={'If-None-Match': etag},
223                  status=304)
224    res = app.get('/', headers={'If-None-Match': 'asdf'},
225                  status=200)
226    res = app.get('/', headers={'If-Modified-Since': 'Sat, 1 Jan 2005 12:00:00 GMT'},
227                  status=200)
228    res = app.get('/', headers={'If-Modified-Since': last_mod + '; length=100'},
229                  status=304)
230    res = app.get('/', headers={'If-Modified-Since': 'invalid date'},
231                  status=400)
232
233def test_methods():
234    filename = os.path.join(os.path.dirname(__file__),
235                            'urlparser_data', 'secured.txt')
236    app = TestApp(fileapp.FileApp(filename))
237    get_res = app.get('')
238    res = app.get('', extra_environ={'REQUEST_METHOD': 'HEAD'})
239    assert res.headers == get_res.headers
240    assert not res.body
241    app.post('', status=405) # Method Not Allowed
242
243