1# Adapted from test_file.py by Daniel Stutzbach
2
3from __future__ import unicode_literals
4
5import sys
6import os
7import errno
8import unittest
9from array import array
10from weakref import proxy
11from functools import wraps
12from UserList import UserList
13
14from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
15from test.test_support import py3k_bytes as bytes, cpython_only
16from test.script_helper import run_python
17
18from _io import FileIO as _FileIO
19
20class AutoFileTests(unittest.TestCase):
21    # file tests for which a test file is automatically set up
22
23    def setUp(self):
24        self.f = _FileIO(TESTFN, 'w')
25
26    def tearDown(self):
27        if self.f:
28            self.f.close()
29        os.remove(TESTFN)
30
31    def testWeakRefs(self):
32        # verify weak references
33        p = proxy(self.f)
34        p.write(bytes(range(10)))
35        self.assertEqual(self.f.tell(), p.tell())
36        self.f.close()
37        self.f = None
38        self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40    def testSeekTell(self):
41        self.f.write(bytes(range(20)))
42        self.assertEqual(self.f.tell(), 20)
43        self.f.seek(0)
44        self.assertEqual(self.f.tell(), 0)
45        self.f.seek(10)
46        self.assertEqual(self.f.tell(), 10)
47        self.f.seek(5, 1)
48        self.assertEqual(self.f.tell(), 15)
49        self.f.seek(-5, 1)
50        self.assertEqual(self.f.tell(), 10)
51        self.f.seek(-5, 2)
52        self.assertEqual(self.f.tell(), 15)
53
54    def testAttributes(self):
55        # verify expected attributes exist
56        f = self.f
57
58        self.assertEqual(f.mode, "wb")
59        self.assertEqual(f.closed, False)
60
61        # verify the attributes are readonly
62        for attr in 'mode', 'closed':
63            self.assertRaises((AttributeError, TypeError),
64                              setattr, f, attr, 'oops')
65
66    def testReadinto(self):
67        # verify readinto
68        self.f.write(b"\x01\x02")
69        self.f.close()
70        a = array(b'b', b'x'*10)
71        self.f = _FileIO(TESTFN, 'r')
72        n = self.f.readinto(a)
73        self.assertEqual(array(b'b', [1, 2]), a[:n])
74
75    def testWritelinesList(self):
76        l = [b'123', b'456']
77        self.f.writelines(l)
78        self.f.close()
79        self.f = _FileIO(TESTFN, 'rb')
80        buf = self.f.read()
81        self.assertEqual(buf, b'123456')
82
83    def testWritelinesUserList(self):
84        l = UserList([b'123', b'456'])
85        self.f.writelines(l)
86        self.f.close()
87        self.f = _FileIO(TESTFN, 'rb')
88        buf = self.f.read()
89        self.assertEqual(buf, b'123456')
90
91    def testWritelinesError(self):
92        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
93        self.assertRaises(TypeError, self.f.writelines, None)
94
95    def test_none_args(self):
96        self.f.write(b"hi\nbye\nabc")
97        self.f.close()
98        self.f = _FileIO(TESTFN, 'r')
99        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
100        self.f.seek(0)
101        self.assertEqual(self.f.readline(None), b"hi\n")
102        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
103
104    def testRepr(self):
105        self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
106                                       % (self.f.name, self.f.mode))
107        del self.f.name
108        self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
109                                       % (self.f.fileno(), self.f.mode))
110        self.f.close()
111        self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
112
113    def testErrors(self):
114        f = self.f
115        self.assertFalse(f.isatty())
116        self.assertFalse(f.closed)
117        #self.assertEqual(f.name, TESTFN)
118        self.assertRaises(ValueError, f.read, 10) # Open for reading
119        f.close()
120        self.assertTrue(f.closed)
121        f = _FileIO(TESTFN, 'r')
122        self.assertRaises(TypeError, f.readinto, "")
123        self.assertFalse(f.closed)
124        f.close()
125        self.assertTrue(f.closed)
126
127    def testMethods(self):
128        methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
129                   'read', 'readall', 'readline', 'readlines',
130                   'tell', 'truncate', 'flush']
131        if sys.platform.startswith('atheos'):
132            methods.remove('truncate')
133
134        self.f.close()
135        self.assertTrue(self.f.closed)
136
137        for methodname in methods:
138            method = getattr(self.f, methodname)
139            # should raise on closed file
140            self.assertRaises(ValueError, method)
141
142        self.assertRaises(ValueError, self.f.readinto) # XXX should be TypeError?
143        self.assertRaises(ValueError, self.f.readinto, bytearray(1))
144        self.assertRaises(ValueError, self.f.seek)
145        self.assertRaises(ValueError, self.f.seek, 0)
146        self.assertRaises(ValueError, self.f.write)
147        self.assertRaises(ValueError, self.f.write, b'')
148        self.assertRaises(TypeError, self.f.writelines)
149        self.assertRaises(ValueError, self.f.writelines, b'')
150
151    def testOpendir(self):
152        # Issue 3703: opening a directory should fill the errno
153        # Windows always returns "[Errno 13]: Permission denied
154        # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
155        try:
156            _FileIO('.', 'r')
157        except IOError as e:
158            self.assertNotEqual(e.errno, 0)
159            self.assertEqual(e.filename, ".")
160        else:
161            self.fail("Should have raised IOError")
162
163    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
164    def testOpenDirFD(self):
165        fd = os.open('.', os.O_RDONLY)
166        with self.assertRaises(IOError) as cm:
167            _FileIO(fd, 'r')
168        os.close(fd)
169        self.assertEqual(cm.exception.errno, errno.EISDIR)
170
171    #A set of functions testing that we get expected behaviour if someone has
172    #manually closed the internal file descriptor.  First, a decorator:
173    def ClosedFD(func):
174        @wraps(func)
175        def wrapper(self):
176            #forcibly close the fd before invoking the problem function
177            f = self.f
178            os.close(f.fileno())
179            try:
180                func(self, f)
181            finally:
182                try:
183                    self.f.close()
184                except IOError:
185                    pass
186        return wrapper
187
188    def ClosedFDRaises(func):
189        @wraps(func)
190        def wrapper(self):
191            #forcibly close the fd before invoking the problem function
192            f = self.f
193            os.close(f.fileno())
194            try:
195                func(self, f)
196            except IOError as e:
197                self.assertEqual(e.errno, errno.EBADF)
198            else:
199                self.fail("Should have raised IOError")
200            finally:
201                try:
202                    self.f.close()
203                except IOError:
204                    pass
205        return wrapper
206
207    @ClosedFDRaises
208    def testErrnoOnClose(self, f):
209        f.close()
210
211    @ClosedFDRaises
212    def testErrnoOnClosedWrite(self, f):
213        f.write('a')
214
215    @ClosedFDRaises
216    def testErrnoOnClosedSeek(self, f):
217        f.seek(0)
218
219    @ClosedFDRaises
220    def testErrnoOnClosedTell(self, f):
221        f.tell()
222
223    @ClosedFDRaises
224    def testErrnoOnClosedTruncate(self, f):
225        f.truncate(0)
226
227    @ClosedFD
228    def testErrnoOnClosedSeekable(self, f):
229        f.seekable()
230
231    @ClosedFD
232    def testErrnoOnClosedReadable(self, f):
233        f.readable()
234
235    @ClosedFD
236    def testErrnoOnClosedWritable(self, f):
237        f.writable()
238
239    @ClosedFD
240    def testErrnoOnClosedFileno(self, f):
241        f.fileno()
242
243    @ClosedFD
244    def testErrnoOnClosedIsatty(self, f):
245        self.assertEqual(f.isatty(), False)
246
247    def ReopenForRead(self):
248        try:
249            self.f.close()
250        except IOError:
251            pass
252        self.f = _FileIO(TESTFN, 'r')
253        os.close(self.f.fileno())
254        return self.f
255
256    @ClosedFDRaises
257    def testErrnoOnClosedRead(self, f):
258        f = self.ReopenForRead()
259        f.read(1)
260
261    @ClosedFDRaises
262    def testErrnoOnClosedReadall(self, f):
263        f = self.ReopenForRead()
264        f.readall()
265
266    @ClosedFDRaises
267    def testErrnoOnClosedReadinto(self, f):
268        f = self.ReopenForRead()
269        a = array(b'b', b'x'*10)
270        f.readinto(a)
271
272class OtherFileTests(unittest.TestCase):
273
274    def testAbles(self):
275        try:
276            f = _FileIO(TESTFN, "w")
277            self.assertEqual(f.readable(), False)
278            self.assertEqual(f.writable(), True)
279            self.assertEqual(f.seekable(), True)
280            f.close()
281
282            f = _FileIO(TESTFN, "r")
283            self.assertEqual(f.readable(), True)
284            self.assertEqual(f.writable(), False)
285            self.assertEqual(f.seekable(), True)
286            f.close()
287
288            f = _FileIO(TESTFN, "a+")
289            self.assertEqual(f.readable(), True)
290            self.assertEqual(f.writable(), True)
291            self.assertEqual(f.seekable(), True)
292            self.assertEqual(f.isatty(), False)
293            f.close()
294        finally:
295            os.unlink(TESTFN)
296
297    @unittest.skipIf(sys.platform == 'win32', 'no ttys on Windows')
298    def testAblesOnTTY(self):
299        try:
300            f = _FileIO("/dev/tty", "a")
301        except EnvironmentError:
302            # When run in a cron job there just aren't any
303            # ttys, so skip the test.  This also handles other
304            # OS'es that don't support /dev/tty.
305            self.skipTest('need /dev/tty')
306        else:
307            self.assertEqual(f.readable(), False)
308            self.assertEqual(f.writable(), True)
309            if sys.platform != "darwin" and \
310               'bsd' not in sys.platform and \
311               not sys.platform.startswith(('sunos', 'aix')):
312                # Somehow /dev/tty appears seekable on some BSDs
313                self.assertEqual(f.seekable(), False)
314            self.assertEqual(f.isatty(), True)
315            f.close()
316
317    def testInvalidModeStrings(self):
318        # check invalid mode strings
319        for mode in ("", "aU", "wU+", "rw", "rt"):
320            try:
321                f = _FileIO(TESTFN, mode)
322            except ValueError:
323                pass
324            else:
325                f.close()
326                self.fail('%r is an invalid file mode' % mode)
327
328    def testModeStrings(self):
329        # test that the mode attribute is correct for various mode strings
330        # given as init args
331        try:
332            for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
333                          ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
334                          ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
335                          ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
336                # read modes are last so that TESTFN will exist first
337                with _FileIO(TESTFN, modes[0]) as f:
338                    self.assertEqual(f.mode, modes[1])
339        finally:
340            if os.path.exists(TESTFN):
341                os.unlink(TESTFN)
342
343    def testUnicodeOpen(self):
344        # verify repr works for unicode too
345        f = _FileIO(str(TESTFN), "w")
346        f.close()
347        os.unlink(TESTFN)
348
349    def testBytesOpen(self):
350        # Opening a bytes filename
351        try:
352            fn = TESTFN.encode("ascii")
353        except UnicodeEncodeError:
354            self.skipTest('could not encode %r to ascii' % TESTFN)
355        f = _FileIO(fn, "w")
356        try:
357            f.write(b"abc")
358            f.close()
359            with open(TESTFN, "rb") as f:
360                self.assertEqual(f.read(), b"abc")
361        finally:
362            os.unlink(TESTFN)
363
364    def testConstructorHandlesNULChars(self):
365        fn_with_NUL = 'foo\0bar'
366        self.assertRaises(TypeError, _FileIO, fn_with_NUL, 'w')
367        self.assertRaises(TypeError, _FileIO, fn_with_NUL.encode('ascii'), 'w')
368
369    def testInvalidFd(self):
370        self.assertRaises(ValueError, _FileIO, -10)
371        self.assertRaises(OSError, _FileIO, make_bad_fd())
372        if sys.platform == 'win32':
373            import msvcrt
374            self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
375
376    @cpython_only
377    def testInvalidFd_overflow(self):
378        # Issue 15989
379        import _testcapi
380        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
381        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
382
383    def testBadModeArgument(self):
384        # verify that we get a sensible error message for bad mode argument
385        bad_mode = "qwerty"
386        try:
387            f = _FileIO(TESTFN, bad_mode)
388        except ValueError as msg:
389            if msg.args[0] != 0:
390                s = str(msg)
391                if TESTFN in s or bad_mode not in s:
392                    self.fail("bad error message for invalid mode: %s" % s)
393            # if msg.args[0] == 0, we're probably on Windows where there may be
394            # no obvious way to discover why open() failed.
395        else:
396            f.close()
397            self.fail("no error for invalid mode: %s" % bad_mode)
398
399    def testTruncate(self):
400        f = _FileIO(TESTFN, 'w')
401        f.write(bytes(bytearray(range(10))))
402        self.assertEqual(f.tell(), 10)
403        f.truncate(5)
404        self.assertEqual(f.tell(), 10)
405        self.assertEqual(f.seek(0, os.SEEK_END), 5)
406        f.truncate(15)
407        self.assertEqual(f.tell(), 5)
408        self.assertEqual(f.seek(0, os.SEEK_END), 15)
409        f.close()
410
411    def testTruncateOnWindows(self):
412        def bug801631():
413            # SF bug <http://www.python.org/sf/801631>
414            # "file.truncate fault on windows"
415            f = _FileIO(TESTFN, 'w')
416            f.write(bytes(range(11)))
417            f.close()
418
419            f = _FileIO(TESTFN,'r+')
420            data = f.read(5)
421            if data != bytes(range(5)):
422                self.fail("Read on file opened for update failed %r" % data)
423            if f.tell() != 5:
424                self.fail("File pos after read wrong %d" % f.tell())
425
426            f.truncate()
427            if f.tell() != 5:
428                self.fail("File pos after ftruncate wrong %d" % f.tell())
429
430            f.close()
431            size = os.path.getsize(TESTFN)
432            if size != 5:
433                self.fail("File size after ftruncate wrong %d" % size)
434
435        try:
436            bug801631()
437        finally:
438            os.unlink(TESTFN)
439
440    def testAppend(self):
441        try:
442            f = open(TESTFN, 'wb')
443            f.write(b'spam')
444            f.close()
445            f = open(TESTFN, 'ab')
446            f.write(b'eggs')
447            f.close()
448            f = open(TESTFN, 'rb')
449            d = f.read()
450            f.close()
451            self.assertEqual(d, b'spameggs')
452        finally:
453            try:
454                os.unlink(TESTFN)
455            except:
456                pass
457
458    def testInvalidInit(self):
459        self.assertRaises(TypeError, _FileIO, "1", 0, 0)
460
461    def testWarnings(self):
462        with check_warnings(quiet=True) as w:
463            self.assertEqual(w.warnings, [])
464            self.assertRaises(TypeError, _FileIO, [])
465            self.assertEqual(w.warnings, [])
466            self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
467            self.assertEqual(w.warnings, [])
468
469    def test_surrogates(self):
470        # Issue #8438: try to open a filename containing surrogates.
471        # It should either fail because the file doesn't exist or the filename
472        # can't be represented using the filesystem encoding, but not because
473        # of a LookupError for the error handler "surrogateescape".
474        filename = u'\udc80.txt'
475        try:
476            with _FileIO(filename):
477                pass
478        except (UnicodeEncodeError, IOError):
479            pass
480        # Spawn a separate Python process with a different "file system
481        # default encoding", to exercise this further.
482        env = dict(os.environ)
483        env[b'LC_CTYPE'] = b'C'
484        _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
485        if ('UnicodeEncodeError' not in out and not
486                ( ('IOError: [Errno 2] No such file or directory' in out) or
487                  ('IOError: [Errno 22] Invalid argument' in out) ) ):
488            self.fail('Bad output: %r' % out)
489
490    def testUnclosedFDOnException(self):
491        class MyException(Exception): pass
492        class MyFileIO(_FileIO):
493            def __setattr__(self, name, value):
494                if name == "name":
495                    raise MyException("blocked setting name")
496                return super(MyFileIO, self).__setattr__(name, value)
497        fd = os.open(__file__, os.O_RDONLY)
498        self.assertRaises(MyException, MyFileIO, fd)
499        os.close(fd)  # should not raise OSError(EBADF)
500
501def test_main():
502    # Historically, these tests have been sloppy about removing TESTFN.
503    # So get rid of it no matter what.
504    try:
505        run_unittest(AutoFileTests, OtherFileTests)
506    finally:
507        if os.path.exists(TESTFN):
508            os.unlink(TESTFN)
509
510if __name__ == '__main__':
511    test_main()
512