1"""Test largefile support on system where this makes sense.
2"""
3
4from __future__ import print_function
5
6import os
7import stat
8import sys
9import unittest
10from test.test_support import run_unittest, TESTFN, verbose, requires, \
11                              unlink
12import io  # C implementation of io
13import _pyio as pyio # Python implementation of io
14
15try:
16    import signal
17    # The default handler for SIGXFSZ is to abort the process.
18    # By ignoring it, system calls exceeding the file size resource
19    # limit will raise IOError instead of crashing the interpreter.
20    oldhandler = signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
21except (ImportError, AttributeError):
22    pass
23
24# create >2GB file (2GB = 2147483648 bytes)
25size = 2500000000
26
27
28class LargeFileTest(unittest.TestCase):
29    """Test that each file function works as expected for a large
30    (i.e. > 2GB, do  we have to check > 4GB) files.
31
32    NOTE: the order of execution of the test methods is important! test_seek
33    must run first to create the test file. File cleanup must also be handled
34    outside the test instances because of this.
35
36    """
37
38    def test_seek(self):
39        if verbose:
40            print('create large file via seek (may be sparse file) ...')
41        with self.open(TESTFN, 'wb') as f:
42            f.write(b'z')
43            f.seek(0)
44            f.seek(size)
45            f.write(b'a')
46            f.flush()
47            if verbose:
48                print('check file size with os.fstat')
49            self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
50
51    def test_osstat(self):
52        if verbose:
53            print('check file size with os.stat')
54        self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
55
56    def test_seek_read(self):
57        if verbose:
58            print('play around with seek() and read() with the built largefile')
59        with self.open(TESTFN, 'rb') as f:
60            self.assertEqual(f.tell(), 0)
61            self.assertEqual(f.read(1), b'z')
62            self.assertEqual(f.tell(), 1)
63            f.seek(0)
64            self.assertEqual(f.tell(), 0)
65            f.seek(0, 0)
66            self.assertEqual(f.tell(), 0)
67            f.seek(42)
68            self.assertEqual(f.tell(), 42)
69            f.seek(42, 0)
70            self.assertEqual(f.tell(), 42)
71            f.seek(42, 1)
72            self.assertEqual(f.tell(), 84)
73            f.seek(0, 1)
74            self.assertEqual(f.tell(), 84)
75            f.seek(0, 2)  # seek from the end
76            self.assertEqual(f.tell(), size + 1 + 0)
77            f.seek(-10, 2)
78            self.assertEqual(f.tell(), size + 1 - 10)
79            f.seek(-size-1, 2)
80            self.assertEqual(f.tell(), 0)
81            f.seek(size)
82            self.assertEqual(f.tell(), size)
83            # the 'a' that was written at the end of file above
84            self.assertEqual(f.read(1), b'a')
85            f.seek(-size-1, 1)
86            self.assertEqual(f.read(1), b'z')
87            self.assertEqual(f.tell(), 1)
88
89    def test_lseek(self):
90        if verbose:
91            print('play around with os.lseek() with the built largefile')
92        with self.open(TESTFN, 'rb') as f:
93            self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
94            self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
95            self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
96            self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
97            self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
98            self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
99            self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
100            self.assertEqual(os.lseek(f.fileno(), size, 0), size)
101            # the 'a' that was written at the end of file above
102            self.assertEqual(f.read(1), b'a')
103
104    def test_truncate(self):
105        if verbose:
106            print('try truncate')
107        with self.open(TESTFN, 'r+b') as f:
108            # this is already decided before start running the test suite
109            # but we do it anyway for extra protection
110            if not hasattr(f, 'truncate'):
111                raise unittest.SkipTest("open().truncate() not available on this system")
112            f.seek(0, 2)
113            # else we've lost track of the true size
114            self.assertEqual(f.tell(), size+1)
115            # Cut it back via seek + truncate with no argument.
116            newsize = size - 10
117            f.seek(newsize)
118            f.truncate()
119            self.assertEqual(f.tell(), newsize)  # else pointer moved
120            f.seek(0, 2)
121            self.assertEqual(f.tell(), newsize)  # else wasn't truncated
122            # Ensure that truncate(smaller than true size) shrinks
123            # the file.
124            newsize -= 1
125            f.seek(42)
126            f.truncate(newsize)
127            if self.new_io:
128                self.assertEqual(f.tell(), 42)
129            f.seek(0, 2)
130            self.assertEqual(f.tell(), newsize)
131            # XXX truncate(larger than true size) is ill-defined
132            # across platform; cut it waaaaay back
133            f.seek(0)
134            f.truncate(1)
135            if self.new_io:
136                self.assertEqual(f.tell(), 0)       # else pointer moved
137            f.seek(0)
138            self.assertEqual(len(f.read()), 1)  # else wasn't truncated
139
140    def test_seekable(self):
141        # Issue #5016; seekable() can return False when the current position
142        # is negative when truncated to an int.
143        if not self.new_io:
144            self.skipTest("builtin file doesn't have seekable()")
145        for pos in (2**31-1, 2**31, 2**31+1):
146            with self.open(TESTFN, 'rb') as f:
147                f.seek(pos)
148                self.assertTrue(f.seekable())
149
150
151def test_main():
152    # On Windows and Mac OSX this test comsumes large resources; It
153    # takes a long time to build the >2GB file and takes >2GB of disk
154    # space therefore the resource must be enabled to run this test.
155    # If not, nothing after this line stanza will be executed.
156    if sys.platform[:3] == 'win' or sys.platform == 'darwin':
157        requires('largefile',
158                 'test requires %s bytes and a long time to run' % str(size))
159    else:
160        # Only run if the current filesystem supports large files.
161        # (Skip this test on Windows, since we now always support
162        # large files.)
163        f = open(TESTFN, 'wb', buffering=0)
164        try:
165            # 2**31 == 2147483648
166            f.seek(2147483649)
167            # Seeking is not enough of a test: you must write and
168            # flush, too!
169            f.write(b'x')
170            f.flush()
171        except (IOError, OverflowError):
172            f.close()
173            unlink(TESTFN)
174            raise unittest.SkipTest("filesystem does not have largefile support")
175        else:
176            f.close()
177    suite = unittest.TestSuite()
178    for _open, prefix in [(io.open, 'C'), (pyio.open, 'Py'),
179                          (open, 'Builtin')]:
180        class TestCase(LargeFileTest):
181            pass
182        TestCase.open = staticmethod(_open)
183        TestCase.new_io = _open is not open
184        TestCase.__name__ = prefix + LargeFileTest.__name__
185        suite.addTest(TestCase('test_seek'))
186        suite.addTest(TestCase('test_osstat'))
187        suite.addTest(TestCase('test_seek_read'))
188        suite.addTest(TestCase('test_lseek'))
189        with _open(TESTFN, 'wb') as f:
190            if hasattr(f, 'truncate'):
191                suite.addTest(TestCase('test_truncate'))
192        suite.addTest(TestCase('test_seekable'))
193        unlink(TESTFN)
194    try:
195        run_unittest(suite)
196    finally:
197        unlink(TESTFN)
198
199if __name__ == '__main__':
200    test_main()
201