1# -*- coding: iso-8859-15 -*-
2
3import sys
4import os
5import shutil
6import StringIO
7from hashlib import md5
8import errno
9
10import unittest
11import tarfile
12
13from test import test_support
14
15# Check for our compression modules.
16try:
17    import gzip
18    gzip.GzipFile
19except (ImportError, AttributeError):
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25
26def md5sum(data):
27    return md5(data).hexdigest()
28
29TEMPDIR = os.path.abspath(test_support.TESTFN)
30tarname = test_support.findfile("testtar.tar")
31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
33tmpname = os.path.join(TEMPDIR, "tmp.tar")
34
35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
37
38
39class ReadTest(unittest.TestCase):
40
41    tarname = tarname
42    mode = "r:"
43
44    def setUp(self):
45        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
46
47    def tearDown(self):
48        self.tar.close()
49
50
51class UstarReadTest(ReadTest):
52
53    def test_fileobj_regular_file(self):
54        tarinfo = self.tar.getmember("ustar/regtype")
55        fobj = self.tar.extractfile(tarinfo)
56        data = fobj.read()
57        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
58                "regular file extraction failed")
59
60    def test_fileobj_readlines(self):
61        self.tar.extract("ustar/regtype", TEMPDIR)
62        tarinfo = self.tar.getmember("ustar/regtype")
63        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
64        fobj2 = self.tar.extractfile(tarinfo)
65
66        lines1 = fobj1.readlines()
67        lines2 = fobj2.readlines()
68        self.assertTrue(lines1 == lines2,
69                "fileobj.readlines() failed")
70        self.assertTrue(len(lines2) == 114,
71                "fileobj.readlines() failed")
72        self.assertTrue(lines2[83] ==
73                "I will gladly admit that Python is not the fastest running scripting language.\n",
74                "fileobj.readlines() failed")
75
76    def test_fileobj_iter(self):
77        self.tar.extract("ustar/regtype", TEMPDIR)
78        tarinfo = self.tar.getmember("ustar/regtype")
79        fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
80        fobj2 = self.tar.extractfile(tarinfo)
81        lines1 = fobj1.readlines()
82        lines2 = [line for line in fobj2]
83        self.assertTrue(lines1 == lines2,
84                     "fileobj.__iter__() failed")
85
86    def test_fileobj_seek(self):
87        self.tar.extract("ustar/regtype", TEMPDIR)
88        fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb")
89        data = fobj.read()
90        fobj.close()
91
92        tarinfo = self.tar.getmember("ustar/regtype")
93        fobj = self.tar.extractfile(tarinfo)
94
95        text = fobj.read()
96        fobj.seek(0)
97        self.assertTrue(0 == fobj.tell(),
98                     "seek() to file's start failed")
99        fobj.seek(2048, 0)
100        self.assertTrue(2048 == fobj.tell(),
101                     "seek() to absolute position failed")
102        fobj.seek(-1024, 1)
103        self.assertTrue(1024 == fobj.tell(),
104                     "seek() to negative relative position failed")
105        fobj.seek(1024, 1)
106        self.assertTrue(2048 == fobj.tell(),
107                     "seek() to positive relative position failed")
108        s = fobj.read(10)
109        self.assertTrue(s == data[2048:2058],
110                     "read() after seek failed")
111        fobj.seek(0, 2)
112        self.assertTrue(tarinfo.size == fobj.tell(),
113                     "seek() to file's end failed")
114        self.assertTrue(fobj.read() == "",
115                     "read() at file's end did not return empty string")
116        fobj.seek(-tarinfo.size, 2)
117        self.assertTrue(0 == fobj.tell(),
118                     "relative seek() to file's start failed")
119        fobj.seek(512)
120        s1 = fobj.readlines()
121        fobj.seek(512)
122        s2 = fobj.readlines()
123        self.assertTrue(s1 == s2,
124                     "readlines() after seek failed")
125        fobj.seek(0)
126        self.assertTrue(len(fobj.readline()) == fobj.tell(),
127                     "tell() after readline() failed")
128        fobj.seek(512)
129        self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
130                     "tell() after seek() and readline() failed")
131        fobj.seek(0)
132        line = fobj.readline()
133        self.assertTrue(fobj.read() == data[len(line):],
134                     "read() after readline() failed")
135        fobj.close()
136
137    # Test if symbolic and hard links are resolved by extractfile().  The
138    # test link members each point to a regular member whose data is
139    # supposed to be exported.
140    def _test_fileobj_link(self, lnktype, regtype):
141        a = self.tar.extractfile(lnktype)
142        b = self.tar.extractfile(regtype)
143        self.assertEqual(a.name, b.name)
144
145    def test_fileobj_link1(self):
146        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
147
148    def test_fileobj_link2(self):
149        self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
150
151    def test_fileobj_symlink1(self):
152        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
153
154    def test_fileobj_symlink2(self):
155        self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
156
157
158class CommonReadTest(ReadTest):
159
160    def test_empty_tarfile(self):
161        # Test for issue6123: Allow opening empty archives.
162        # This test checks if tarfile.open() is able to open an empty tar
163        # archive successfully. Note that an empty tar archive is not the
164        # same as an empty file!
165        tarfile.open(tmpname, self.mode.replace("r", "w")).close()
166        try:
167            tar = tarfile.open(tmpname, self.mode)
168            tar.getnames()
169        except tarfile.ReadError:
170            self.fail("tarfile.open() failed on empty archive")
171        self.assertListEqual(tar.getmembers(), [])
172
173    def test_null_tarfile(self):
174        # Test for issue6123: Allow opening empty archives.
175        # This test guarantees that tarfile.open() does not treat an empty
176        # file as an empty tar archive.
177        open(tmpname, "wb").close()
178        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
179        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
180
181    def test_ignore_zeros(self):
182        # Test TarFile's ignore_zeros option.
183        if self.mode.endswith(":gz"):
184            _open = gzip.GzipFile
185        elif self.mode.endswith(":bz2"):
186            _open = bz2.BZ2File
187        else:
188            _open = open
189
190        for char in ('\0', 'a'):
191            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
192            # are ignored correctly.
193            fobj = _open(tmpname, "wb")
194            fobj.write(char * 1024)
195            fobj.write(tarfile.TarInfo("foo").tobuf())
196            fobj.close()
197
198            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
199            self.assertListEqual(tar.getnames(), ["foo"],
200                    "ignore_zeros=True should have skipped the %r-blocks" % char)
201            tar.close()
202
203
204class MiscReadTest(CommonReadTest):
205
206    def test_no_name_argument(self):
207        fobj = open(self.tarname, "rb")
208        tar = tarfile.open(fileobj=fobj, mode=self.mode)
209        self.assertEqual(tar.name, os.path.abspath(fobj.name))
210
211    def test_no_name_attribute(self):
212        data = open(self.tarname, "rb").read()
213        fobj = StringIO.StringIO(data)
214        self.assertRaises(AttributeError, getattr, fobj, "name")
215        tar = tarfile.open(fileobj=fobj, mode=self.mode)
216        self.assertEqual(tar.name, None)
217
218    def test_empty_name_attribute(self):
219        data = open(self.tarname, "rb").read()
220        fobj = StringIO.StringIO(data)
221        fobj.name = ""
222        tar = tarfile.open(fileobj=fobj, mode=self.mode)
223        self.assertEqual(tar.name, None)
224
225    def test_fileobj_with_offset(self):
226        # Skip the first member and store values from the second member
227        # of the testtar.
228        tar = tarfile.open(self.tarname, mode=self.mode)
229        tar.next()
230        t = tar.next()
231        name = t.name
232        offset = t.offset
233        data = tar.extractfile(t).read()
234        tar.close()
235
236        # Open the testtar and seek to the offset of the second member.
237        if self.mode.endswith(":gz"):
238            _open = gzip.GzipFile
239        elif self.mode.endswith(":bz2"):
240            _open = bz2.BZ2File
241        else:
242            _open = open
243        fobj = _open(self.tarname, "rb")
244        fobj.seek(offset)
245
246        # Test if the tarfile starts with the second member.
247        tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
248        t = tar.next()
249        self.assertEqual(t.name, name)
250        # Read to the end of fileobj and test if seeking back to the
251        # beginning works.
252        tar.getmembers()
253        self.assertEqual(tar.extractfile(t).read(), data,
254                "seek back did not work")
255        tar.close()
256
257    def test_fail_comp(self):
258        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
259        if self.mode == "r:":
260            return
261        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
262        fobj = open(tarname, "rb")
263        self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode)
264
265    def test_v7_dirtype(self):
266        # Test old style dirtype member (bug #1336623):
267        # Old V7 tars create directory members using an AREGTYPE
268        # header with a "/" appended to the filename field.
269        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
270        self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
271                "v7 dirtype failed")
272
273    def test_xstar_type(self):
274        # The xstar format stores extra atime and ctime fields inside the
275        # space reserved for the prefix field. The prefix field must be
276        # ignored in this case, otherwise it will mess up the name.
277        try:
278            self.tar.getmember("misc/regtype-xstar")
279        except KeyError:
280            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
281
282    def test_check_members(self):
283        for tarinfo in self.tar:
284            self.assertTrue(int(tarinfo.mtime) == 07606136617,
285                    "wrong mtime for %s" % tarinfo.name)
286            if not tarinfo.name.startswith("ustar/"):
287                continue
288            self.assertTrue(tarinfo.uname == "tarfile",
289                    "wrong uname for %s" % tarinfo.name)
290
291    def test_find_members(self):
292        self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
293                "could not find all members")
294
295    def test_extract_hardlink(self):
296        # Test hardlink extraction (e.g. bug #857297).
297        tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1")
298
299        tar.extract("ustar/regtype", TEMPDIR)
300        try:
301            tar.extract("ustar/lnktype", TEMPDIR)
302        except EnvironmentError, e:
303            if e.errno == errno.ENOENT:
304                self.fail("hardlink not extracted properly")
305
306        data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read()
307        self.assertEqual(md5sum(data), md5_regtype)
308
309        try:
310            tar.extract("ustar/symtype", TEMPDIR)
311        except EnvironmentError, e:
312            if e.errno == errno.ENOENT:
313                self.fail("symlink not extracted properly")
314
315        data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read()
316        self.assertEqual(md5sum(data), md5_regtype)
317
318    def test_extractall(self):
319        # Test if extractall() correctly restores directory permissions
320        # and times (see issue1735).
321        tar = tarfile.open(tarname, encoding="iso8859-1")
322        directories = [t for t in tar if t.isdir()]
323        tar.extractall(TEMPDIR, directories)
324        for tarinfo in directories:
325            path = os.path.join(TEMPDIR, tarinfo.name)
326            if sys.platform != "win32":
327                # Win32 has no support for fine grained permissions.
328                self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
329            self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
330        tar.close()
331
332    def test_init_close_fobj(self):
333        # Issue #7341: Close the internal file object in the TarFile
334        # constructor in case of an error. For the test we rely on
335        # the fact that opening an empty file raises a ReadError.
336        empty = os.path.join(TEMPDIR, "empty")
337        open(empty, "wb").write("")
338
339        try:
340            tar = object.__new__(tarfile.TarFile)
341            try:
342                tar.__init__(empty)
343            except tarfile.ReadError:
344                self.assertTrue(tar.fileobj.closed)
345            else:
346                self.fail("ReadError not raised")
347        finally:
348            os.remove(empty)
349
350
351class StreamReadTest(CommonReadTest):
352
353    mode="r|"
354
355    def test_fileobj_regular_file(self):
356        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
357        fobj = self.tar.extractfile(tarinfo)
358        data = fobj.read()
359        self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
360                "regular file extraction failed")
361
362    def test_provoke_stream_error(self):
363        tarinfos = self.tar.getmembers()
364        f = self.tar.extractfile(tarinfos[0]) # read the first member
365        self.assertRaises(tarfile.StreamError, f.read)
366
367    def test_compare_members(self):
368        tar1 = tarfile.open(tarname, encoding="iso8859-1")
369        tar2 = self.tar
370
371        while True:
372            t1 = tar1.next()
373            t2 = tar2.next()
374            if t1 is None:
375                break
376            self.assertTrue(t2 is not None, "stream.next() failed.")
377
378            if t2.islnk() or t2.issym():
379                self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
380                continue
381
382            v1 = tar1.extractfile(t1)
383            v2 = tar2.extractfile(t2)
384            if v1 is None:
385                continue
386            self.assertTrue(v2 is not None, "stream.extractfile() failed")
387            self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
388
389        tar1.close()
390
391
392class DetectReadTest(unittest.TestCase):
393
394    def _testfunc_file(self, name, mode):
395        try:
396            tarfile.open(name, mode)
397        except tarfile.ReadError:
398            self.fail()
399
400    def _testfunc_fileobj(self, name, mode):
401        try:
402            tarfile.open(name, mode, fileobj=open(name, "rb"))
403        except tarfile.ReadError:
404            self.fail()
405
406    def _test_modes(self, testfunc):
407        testfunc(tarname, "r")
408        testfunc(tarname, "r:")
409        testfunc(tarname, "r:*")
410        testfunc(tarname, "r|")
411        testfunc(tarname, "r|*")
412
413        if gzip:
414            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
415            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
416            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
417            self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
418
419            testfunc(gzipname, "r")
420            testfunc(gzipname, "r:*")
421            testfunc(gzipname, "r:gz")
422            testfunc(gzipname, "r|*")
423            testfunc(gzipname, "r|gz")
424
425        if bz2:
426            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
427            self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
428            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
429            self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
430
431            testfunc(bz2name, "r")
432            testfunc(bz2name, "r:*")
433            testfunc(bz2name, "r:bz2")
434            testfunc(bz2name, "r|*")
435            testfunc(bz2name, "r|bz2")
436
437    def test_detect_file(self):
438        self._test_modes(self._testfunc_file)
439
440    def test_detect_fileobj(self):
441        self._test_modes(self._testfunc_fileobj)
442
443
444class MemberReadTest(ReadTest):
445
446    def _test_member(self, tarinfo, chksum=None, **kwargs):
447        if chksum is not None:
448            self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
449                    "wrong md5sum for %s" % tarinfo.name)
450
451        kwargs["mtime"] = 07606136617
452        kwargs["uid"] = 1000
453        kwargs["gid"] = 100
454        if "old-v7" not in tarinfo.name:
455            # V7 tar can't handle alphabetic owners.
456            kwargs["uname"] = "tarfile"
457            kwargs["gname"] = "tarfile"
458        for k, v in kwargs.iteritems():
459            self.assertTrue(getattr(tarinfo, k) == v,
460                    "wrong value in %s field of %s" % (k, tarinfo.name))
461
462    def test_find_regtype(self):
463        tarinfo = self.tar.getmember("ustar/regtype")
464        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
465
466    def test_find_conttype(self):
467        tarinfo = self.tar.getmember("ustar/conttype")
468        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
469
470    def test_find_dirtype(self):
471        tarinfo = self.tar.getmember("ustar/dirtype")
472        self._test_member(tarinfo, size=0)
473
474    def test_find_dirtype_with_size(self):
475        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
476        self._test_member(tarinfo, size=255)
477
478    def test_find_lnktype(self):
479        tarinfo = self.tar.getmember("ustar/lnktype")
480        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
481
482    def test_find_symtype(self):
483        tarinfo = self.tar.getmember("ustar/symtype")
484        self._test_member(tarinfo, size=0, linkname="regtype")
485
486    def test_find_blktype(self):
487        tarinfo = self.tar.getmember("ustar/blktype")
488        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
489
490    def test_find_chrtype(self):
491        tarinfo = self.tar.getmember("ustar/chrtype")
492        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
493
494    def test_find_fifotype(self):
495        tarinfo = self.tar.getmember("ustar/fifotype")
496        self._test_member(tarinfo, size=0)
497
498    def test_find_sparse(self):
499        tarinfo = self.tar.getmember("ustar/sparse")
500        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
501
502    def test_find_umlauts(self):
503        tarinfo = self.tar.getmember("ustar/umlauts-�������")
504        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
505
506    def test_find_ustar_longname(self):
507        name = "ustar/" + "12345/" * 39 + "1234567/longname"
508        self.assertIn(name, self.tar.getnames())
509
510    def test_find_regtype_oldv7(self):
511        tarinfo = self.tar.getmember("misc/regtype-old-v7")
512        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
513
514    def test_find_pax_umlauts(self):
515        self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
516        tarinfo = self.tar.getmember("pax/umlauts-�������")
517        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
518
519
520class LongnameTest(ReadTest):
521
522    def test_read_longname(self):
523        # Test reading of longname (bug #1471427).
524        longname = self.subdir + "/" + "123/" * 125 + "longname"
525        try:
526            tarinfo = self.tar.getmember(longname)
527        except KeyError:
528            self.fail("longname not found")
529        self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
530
531    def test_read_longlink(self):
532        longname = self.subdir + "/" + "123/" * 125 + "longname"
533        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
534        try:
535            tarinfo = self.tar.getmember(longlink)
536        except KeyError:
537            self.fail("longlink not found")
538        self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
539
540    def test_truncated_longname(self):
541        longname = self.subdir + "/" + "123/" * 125 + "longname"
542        tarinfo = self.tar.getmember(longname)
543        offset = tarinfo.offset
544        self.tar.fileobj.seek(offset)
545        fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
546        self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
547
548    def test_header_offset(self):
549        # Test if the start offset of the TarInfo object includes
550        # the preceding extended header.
551        longname = self.subdir + "/" + "123/" * 125 + "longname"
552        offset = self.tar.getmember(longname).offset
553        fobj = open(tarname)
554        fobj.seek(offset)
555        tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
556        self.assertEqual(tarinfo.type, self.longnametype)
557
558
559class GNUReadTest(LongnameTest):
560
561    subdir = "gnu"
562    longnametype = tarfile.GNUTYPE_LONGNAME
563
564    def test_sparse_file(self):
565        tarinfo1 = self.tar.getmember("ustar/sparse")
566        fobj1 = self.tar.extractfile(tarinfo1)
567        tarinfo2 = self.tar.getmember("gnu/sparse")
568        fobj2 = self.tar.extractfile(tarinfo2)
569        self.assertTrue(fobj1.read() == fobj2.read(),
570                "sparse file extraction failed")
571
572
573class PaxReadTest(LongnameTest):
574
575    subdir = "pax"
576    longnametype = tarfile.XHDTYPE
577
578    def test_pax_global_headers(self):
579        tar = tarfile.open(tarname, encoding="iso8859-1")
580
581        tarinfo = tar.getmember("pax/regtype1")
582        self.assertEqual(tarinfo.uname, "foo")
583        self.assertEqual(tarinfo.gname, "bar")
584        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
585
586        tarinfo = tar.getmember("pax/regtype2")
587        self.assertEqual(tarinfo.uname, "")
588        self.assertEqual(tarinfo.gname, "bar")
589        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
590
591        tarinfo = tar.getmember("pax/regtype3")
592        self.assertEqual(tarinfo.uname, "tarfile")
593        self.assertEqual(tarinfo.gname, "tarfile")
594        self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������")
595
596    def test_pax_number_fields(self):
597        # All following number fields are read from the pax header.
598        tar = tarfile.open(tarname, encoding="iso8859-1")
599        tarinfo = tar.getmember("pax/regtype4")
600        self.assertEqual(tarinfo.size, 7011)
601        self.assertEqual(tarinfo.uid, 123)
602        self.assertEqual(tarinfo.gid, 123)
603        self.assertEqual(tarinfo.mtime, 1041808783.0)
604        self.assertEqual(type(tarinfo.mtime), float)
605        self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
606        self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
607
608
609class WriteTestBase(unittest.TestCase):
610    # Put all write tests in here that are supposed to be tested
611    # in all possible mode combinations.
612
613    def test_fileobj_no_close(self):
614        fobj = StringIO.StringIO()
615        tar = tarfile.open(fileobj=fobj, mode=self.mode)
616        tar.addfile(tarfile.TarInfo("foo"))
617        tar.close()
618        self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
619
620
621class WriteTest(WriteTestBase):
622
623    mode = "w:"
624
625    def test_100_char_name(self):
626        # The name field in a tar header stores strings of at most 100 chars.
627        # If a string is shorter than 100 chars it has to be padded with '\0',
628        # which implies that a string of exactly 100 chars is stored without
629        # a trailing '\0'.
630        name = "0123456789" * 10
631        tar = tarfile.open(tmpname, self.mode)
632        t = tarfile.TarInfo(name)
633        tar.addfile(t)
634        tar.close()
635
636        tar = tarfile.open(tmpname)
637        self.assertTrue(tar.getnames()[0] == name,
638                "failed to store 100 char filename")
639        tar.close()
640
641    def test_tar_size(self):
642        # Test for bug #1013882.
643        tar = tarfile.open(tmpname, self.mode)
644        path = os.path.join(TEMPDIR, "file")
645        fobj = open(path, "wb")
646        fobj.write("aaa")
647        fobj.close()
648        tar.add(path)
649        tar.close()
650        self.assertTrue(os.path.getsize(tmpname) > 0,
651                "tarfile is empty")
652
653    # The test_*_size tests test for bug #1167128.
654    def test_file_size(self):
655        tar = tarfile.open(tmpname, self.mode)
656
657        path = os.path.join(TEMPDIR, "file")
658        fobj = open(path, "wb")
659        fobj.close()
660        tarinfo = tar.gettarinfo(path)
661        self.assertEqual(tarinfo.size, 0)
662
663        fobj = open(path, "wb")
664        fobj.write("aaa")
665        fobj.close()
666        tarinfo = tar.gettarinfo(path)
667        self.assertEqual(tarinfo.size, 3)
668
669        tar.close()
670
671    def test_directory_size(self):
672        path = os.path.join(TEMPDIR, "directory")
673        os.mkdir(path)
674        try:
675            tar = tarfile.open(tmpname, self.mode)
676            tarinfo = tar.gettarinfo(path)
677            self.assertEqual(tarinfo.size, 0)
678        finally:
679            os.rmdir(path)
680
681    def test_link_size(self):
682        if hasattr(os, "link"):
683            link = os.path.join(TEMPDIR, "link")
684            target = os.path.join(TEMPDIR, "link_target")
685            fobj = open(target, "wb")
686            fobj.write("aaa")
687            fobj.close()
688            os.link(target, link)
689            try:
690                tar = tarfile.open(tmpname, self.mode)
691                # Record the link target in the inodes list.
692                tar.gettarinfo(target)
693                tarinfo = tar.gettarinfo(link)
694                self.assertEqual(tarinfo.size, 0)
695            finally:
696                os.remove(target)
697                os.remove(link)
698
699    def test_symlink_size(self):
700        if hasattr(os, "symlink"):
701            path = os.path.join(TEMPDIR, "symlink")
702            os.symlink("link_target", path)
703            try:
704                tar = tarfile.open(tmpname, self.mode)
705                tarinfo = tar.gettarinfo(path)
706                self.assertEqual(tarinfo.size, 0)
707            finally:
708                os.remove(path)
709
710    def test_add_self(self):
711        # Test for #1257255.
712        dstname = os.path.abspath(tmpname)
713
714        tar = tarfile.open(tmpname, self.mode)
715        self.assertTrue(tar.name == dstname, "archive name must be absolute")
716
717        tar.add(dstname)
718        self.assertTrue(tar.getnames() == [], "added the archive to itself")
719
720        cwd = os.getcwd()
721        os.chdir(TEMPDIR)
722        tar.add(dstname)
723        os.chdir(cwd)
724        self.assertTrue(tar.getnames() == [], "added the archive to itself")
725
726    def test_exclude(self):
727        tempdir = os.path.join(TEMPDIR, "exclude")
728        os.mkdir(tempdir)
729        try:
730            for name in ("foo", "bar", "baz"):
731                name = os.path.join(tempdir, name)
732                open(name, "wb").close()
733
734            exclude = os.path.isfile
735
736            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
737            with test_support.check_warnings(("use the filter argument",
738                                              DeprecationWarning)):
739                tar.add(tempdir, arcname="empty_dir", exclude=exclude)
740            tar.close()
741
742            tar = tarfile.open(tmpname, "r")
743            self.assertEqual(len(tar.getmembers()), 1)
744            self.assertEqual(tar.getnames()[0], "empty_dir")
745        finally:
746            shutil.rmtree(tempdir)
747
748    def test_filter(self):
749        tempdir = os.path.join(TEMPDIR, "filter")
750        os.mkdir(tempdir)
751        try:
752            for name in ("foo", "bar", "baz"):
753                name = os.path.join(tempdir, name)
754                open(name, "wb").close()
755
756            def filter(tarinfo):
757                if os.path.basename(tarinfo.name) == "bar":
758                    return
759                tarinfo.uid = 123
760                tarinfo.uname = "foo"
761                return tarinfo
762
763            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
764            tar.add(tempdir, arcname="empty_dir", filter=filter)
765            tar.close()
766
767            tar = tarfile.open(tmpname, "r")
768            for tarinfo in tar:
769                self.assertEqual(tarinfo.uid, 123)
770                self.assertEqual(tarinfo.uname, "foo")
771            self.assertEqual(len(tar.getmembers()), 3)
772            tar.close()
773        finally:
774            shutil.rmtree(tempdir)
775
776    # Guarantee that stored pathnames are not modified. Don't
777    # remove ./ or ../ or double slashes. Still make absolute
778    # pathnames relative.
779    # For details see bug #6054.
780    def _test_pathname(self, path, cmp_path=None, dir=False):
781        # Create a tarfile with an empty member named path
782        # and compare the stored name with the original.
783        foo = os.path.join(TEMPDIR, "foo")
784        if not dir:
785            open(foo, "w").close()
786        else:
787            os.mkdir(foo)
788
789        tar = tarfile.open(tmpname, self.mode)
790        tar.add(foo, arcname=path)
791        tar.close()
792
793        tar = tarfile.open(tmpname, "r")
794        t = tar.next()
795        tar.close()
796
797        if not dir:
798            os.remove(foo)
799        else:
800            os.rmdir(foo)
801
802        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
803
804    def test_pathnames(self):
805        self._test_pathname("foo")
806        self._test_pathname(os.path.join("foo", ".", "bar"))
807        self._test_pathname(os.path.join("foo", "..", "bar"))
808        self._test_pathname(os.path.join(".", "foo"))
809        self._test_pathname(os.path.join(".", "foo", "."))
810        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
811        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
812        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
813        self._test_pathname(os.path.join("..", "foo"))
814        self._test_pathname(os.path.join("..", "foo", ".."))
815        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
816        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
817
818        self._test_pathname("foo" + os.sep + os.sep + "bar")
819        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
820
821    def test_abs_pathnames(self):
822        if sys.platform == "win32":
823            self._test_pathname("C:\\foo", "foo")
824        else:
825            self._test_pathname("/foo", "foo")
826            self._test_pathname("///foo", "foo")
827
828    def test_cwd(self):
829        # Test adding the current working directory.
830        cwd = os.getcwd()
831        os.chdir(TEMPDIR)
832        try:
833            open("foo", "w").close()
834
835            tar = tarfile.open(tmpname, self.mode)
836            tar.add(".")
837            tar.close()
838
839            tar = tarfile.open(tmpname, "r")
840            for t in tar:
841                self.assert_(t.name == "." or t.name.startswith("./"))
842            tar.close()
843        finally:
844            os.chdir(cwd)
845
846    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
847    def test_extractall_symlinks(self):
848        # Test if extractall works properly when tarfile contains symlinks
849        tempdir = os.path.join(TEMPDIR, "testsymlinks")
850        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
851        os.mkdir(tempdir)
852        try:
853            source_file = os.path.join(tempdir,'source')
854            target_file = os.path.join(tempdir,'symlink')
855            with open(source_file,'w') as f:
856                f.write('something\n')
857            os.symlink(source_file, target_file)
858            tar = tarfile.open(temparchive,'w')
859            tar.add(source_file, arcname=os.path.basename(source_file))
860            tar.add(target_file, arcname=os.path.basename(target_file))
861            tar.close()
862            # Let's extract it to the location which contains the symlink
863            tar = tarfile.open(temparchive,'r')
864            # this should not raise OSError: [Errno 17] File exists
865            try:
866                tar.extractall(path=tempdir)
867            except OSError:
868                self.fail("extractall failed with symlinked files")
869            finally:
870                tar.close()
871        finally:
872            os.unlink(temparchive)
873            shutil.rmtree(tempdir)
874
875    @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
876    def test_extractall_broken_symlinks(self):
877        # Test if extractall works properly when tarfile contains broken
878        # symlinks
879        tempdir = os.path.join(TEMPDIR, "testsymlinks")
880        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
881        os.mkdir(tempdir)
882        try:
883            source_file = os.path.join(tempdir,'source')
884            target_file = os.path.join(tempdir,'symlink')
885            with open(source_file,'w') as f:
886                f.write('something\n')
887            os.symlink(source_file, target_file)
888            tar = tarfile.open(temparchive,'w')
889            tar.add(target_file, arcname=os.path.basename(target_file))
890            tar.close()
891            # remove the real file
892            os.unlink(source_file)
893            # Let's extract it to the location which contains the symlink
894            tar = tarfile.open(temparchive,'r')
895            # this should not raise OSError: [Errno 17] File exists
896            try:
897                tar.extractall(path=tempdir)
898            except OSError:
899                self.fail("extractall failed with broken symlinked files")
900            finally:
901                tar.close()
902        finally:
903            os.unlink(temparchive)
904            shutil.rmtree(tempdir)
905
906    @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
907    def test_extractall_hardlinks(self):
908        # Test if extractall works properly when tarfile contains symlinks
909        tempdir = os.path.join(TEMPDIR, "testsymlinks")
910        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
911        os.mkdir(tempdir)
912        try:
913            source_file = os.path.join(tempdir,'source')
914            target_file = os.path.join(tempdir,'symlink')
915            with open(source_file,'w') as f:
916                f.write('something\n')
917            os.link(source_file, target_file)
918            tar = tarfile.open(temparchive,'w')
919            tar.add(source_file, arcname=os.path.basename(source_file))
920            tar.add(target_file, arcname=os.path.basename(target_file))
921            tar.close()
922            # Let's extract it to the location which contains the symlink
923            tar = tarfile.open(temparchive,'r')
924            # this should not raise OSError: [Errno 17] File exists
925            try:
926                tar.extractall(path=tempdir)
927            except OSError:
928                self.fail("extractall failed with linked files")
929            finally:
930                tar.close()
931        finally:
932            os.unlink(temparchive)
933            shutil.rmtree(tempdir)
934
935class StreamWriteTest(WriteTestBase):
936
937    mode = "w|"
938
939    def test_stream_padding(self):
940        # Test for bug #1543303.
941        tar = tarfile.open(tmpname, self.mode)
942        tar.close()
943
944        if self.mode.endswith("gz"):
945            fobj = gzip.GzipFile(tmpname)
946            data = fobj.read()
947            fobj.close()
948        elif self.mode.endswith("bz2"):
949            dec = bz2.BZ2Decompressor()
950            data = open(tmpname, "rb").read()
951            data = dec.decompress(data)
952            self.assertTrue(len(dec.unused_data) == 0,
953                    "found trailing data")
954        else:
955            fobj = open(tmpname, "rb")
956            data = fobj.read()
957            fobj.close()
958
959        self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
960                         "incorrect zero padding")
961
962    def test_file_mode(self):
963        # Test for issue #8464: Create files with correct
964        # permissions.
965        if sys.platform == "win32" or not hasattr(os, "umask"):
966            return
967
968        if os.path.exists(tmpname):
969            os.remove(tmpname)
970
971        original_umask = os.umask(0022)
972        try:
973            tar = tarfile.open(tmpname, self.mode)
974            tar.close()
975            mode = os.stat(tmpname).st_mode & 0777
976            self.assertEqual(mode, 0644, "wrong file permissions")
977        finally:
978            os.umask(original_umask)
979
980
981class GNUWriteTest(unittest.TestCase):
982    # This testcase checks for correct creation of GNU Longname
983    # and Longlink extended headers (cp. bug #812325).
984
985    def _length(self, s):
986        blocks, remainder = divmod(len(s) + 1, 512)
987        if remainder:
988            blocks += 1
989        return blocks * 512
990
991    def _calc_size(self, name, link=None):
992        # Initial tar header
993        count = 512
994
995        if len(name) > tarfile.LENGTH_NAME:
996            # GNU longname extended header + longname
997            count += 512
998            count += self._length(name)
999        if link is not None and len(link) > tarfile.LENGTH_LINK:
1000            # GNU longlink extended header + longlink
1001            count += 512
1002            count += self._length(link)
1003        return count
1004
1005    def _test(self, name, link=None):
1006        tarinfo = tarfile.TarInfo(name)
1007        if link:
1008            tarinfo.linkname = link
1009            tarinfo.type = tarfile.LNKTYPE
1010
1011        tar = tarfile.open(tmpname, "w")
1012        tar.format = tarfile.GNU_FORMAT
1013        tar.addfile(tarinfo)
1014
1015        v1 = self._calc_size(name, link)
1016        v2 = tar.offset
1017        self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
1018
1019        tar.close()
1020
1021        tar = tarfile.open(tmpname)
1022        member = tar.next()
1023        self.assertIsNotNone(member,
1024                "unable to read longname member")
1025        self.assertEqual(tarinfo.name, member.name,
1026                "unable to read longname member")
1027        self.assertEqual(tarinfo.linkname, member.linkname,
1028                "unable to read longname member")
1029
1030    def test_longname_1023(self):
1031        self._test(("longnam/" * 127) + "longnam")
1032
1033    def test_longname_1024(self):
1034        self._test(("longnam/" * 127) + "longname")
1035
1036    def test_longname_1025(self):
1037        self._test(("longnam/" * 127) + "longname_")
1038
1039    def test_longlink_1023(self):
1040        self._test("name", ("longlnk/" * 127) + "longlnk")
1041
1042    def test_longlink_1024(self):
1043        self._test("name", ("longlnk/" * 127) + "longlink")
1044
1045    def test_longlink_1025(self):
1046        self._test("name", ("longlnk/" * 127) + "longlink_")
1047
1048    def test_longnamelink_1023(self):
1049        self._test(("longnam/" * 127) + "longnam",
1050                   ("longlnk/" * 127) + "longlnk")
1051
1052    def test_longnamelink_1024(self):
1053        self._test(("longnam/" * 127) + "longname",
1054                   ("longlnk/" * 127) + "longlink")
1055
1056    def test_longnamelink_1025(self):
1057        self._test(("longnam/" * 127) + "longname_",
1058                   ("longlnk/" * 127) + "longlink_")
1059
1060
1061class HardlinkTest(unittest.TestCase):
1062    # Test the creation of LNKTYPE (hardlink) members in an archive.
1063
1064    def setUp(self):
1065        self.foo = os.path.join(TEMPDIR, "foo")
1066        self.bar = os.path.join(TEMPDIR, "bar")
1067
1068        fobj = open(self.foo, "wb")
1069        fobj.write("foo")
1070        fobj.close()
1071
1072        os.link(self.foo, self.bar)
1073
1074        self.tar = tarfile.open(tmpname, "w")
1075        self.tar.add(self.foo)
1076
1077    def tearDown(self):
1078        self.tar.close()
1079        os.remove(self.foo)
1080        os.remove(self.bar)
1081
1082    def test_add_twice(self):
1083        # The same name will be added as a REGTYPE every
1084        # time regardless of st_nlink.
1085        tarinfo = self.tar.gettarinfo(self.foo)
1086        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1087                "add file as regular failed")
1088
1089    def test_add_hardlink(self):
1090        tarinfo = self.tar.gettarinfo(self.bar)
1091        self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
1092                "add file as hardlink failed")
1093
1094    def test_dereference_hardlink(self):
1095        self.tar.dereference = True
1096        tarinfo = self.tar.gettarinfo(self.bar)
1097        self.assertTrue(tarinfo.type == tarfile.REGTYPE,
1098                "dereferencing hardlink failed")
1099
1100
1101class PaxWriteTest(GNUWriteTest):
1102
1103    def _test(self, name, link=None):
1104        # See GNUWriteTest.
1105        tarinfo = tarfile.TarInfo(name)
1106        if link:
1107            tarinfo.linkname = link
1108            tarinfo.type = tarfile.LNKTYPE
1109
1110        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1111        tar.addfile(tarinfo)
1112        tar.close()
1113
1114        tar = tarfile.open(tmpname)
1115        if link:
1116            l = tar.getmembers()[0].linkname
1117            self.assertTrue(link == l, "PAX longlink creation failed")
1118        else:
1119            n = tar.getmembers()[0].name
1120            self.assertTrue(name == n, "PAX longname creation failed")
1121
1122    def test_pax_global_header(self):
1123        pax_headers = {
1124                u"foo": u"bar",
1125                u"uid": u"0",
1126                u"mtime": u"1.23",
1127                u"test": u"���",
1128                u"���": u"test"}
1129
1130        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1131                pax_headers=pax_headers)
1132        tar.addfile(tarfile.TarInfo("test"))
1133        tar.close()
1134
1135        # Test if the global header was written correctly.
1136        tar = tarfile.open(tmpname, encoding="iso8859-1")
1137        self.assertEqual(tar.pax_headers, pax_headers)
1138        self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1139
1140        # Test if all the fields are unicode.
1141        for key, val in tar.pax_headers.iteritems():
1142            self.assertTrue(type(key) is unicode)
1143            self.assertTrue(type(val) is unicode)
1144            if key in tarfile.PAX_NUMBER_FIELDS:
1145                try:
1146                    tarfile.PAX_NUMBER_FIELDS[key](val)
1147                except (TypeError, ValueError):
1148                    self.fail("unable to convert pax header field")
1149
1150    def test_pax_extended_header(self):
1151        # The fields from the pax header have priority over the
1152        # TarInfo.
1153        pax_headers = {u"path": u"foo", u"uid": u"123"}
1154
1155        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
1156        t = tarfile.TarInfo()
1157        t.name = u"���"     # non-ASCII
1158        t.uid = 8**8        # too large
1159        t.pax_headers = pax_headers
1160        tar.addfile(t)
1161        tar.close()
1162
1163        tar = tarfile.open(tmpname, encoding="iso8859-1")
1164        t = tar.getmembers()[0]
1165        self.assertEqual(t.pax_headers, pax_headers)
1166        self.assertEqual(t.name, "foo")
1167        self.assertEqual(t.uid, 123)
1168
1169
1170class UstarUnicodeTest(unittest.TestCase):
1171    # All *UnicodeTests FIXME
1172
1173    format = tarfile.USTAR_FORMAT
1174
1175    def test_iso8859_1_filename(self):
1176        self._test_unicode_filename("iso8859-1")
1177
1178    def test_utf7_filename(self):
1179        self._test_unicode_filename("utf7")
1180
1181    def test_utf8_filename(self):
1182        self._test_unicode_filename("utf8")
1183
1184    def _test_unicode_filename(self, encoding):
1185        tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
1186        name = u"���"
1187        tar.addfile(tarfile.TarInfo(name))
1188        tar.close()
1189
1190        tar = tarfile.open(tmpname, encoding=encoding)
1191        self.assertTrue(type(tar.getnames()[0]) is not unicode)
1192        self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
1193        tar.close()
1194
1195    def test_unicode_filename_error(self):
1196        tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
1197        tarinfo = tarfile.TarInfo()
1198
1199        tarinfo.name = "���"
1200        if self.format == tarfile.PAX_FORMAT:
1201            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1202        else:
1203            tar.addfile(tarinfo)
1204
1205        tarinfo.name = u"���"
1206        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1207
1208        tarinfo.name = "foo"
1209        tarinfo.uname = u"���"
1210        self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1211
1212    def test_unicode_argument(self):
1213        tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
1214        for t in tar:
1215            self.assertTrue(type(t.name) is str)
1216            self.assertTrue(type(t.linkname) is str)
1217            self.assertTrue(type(t.uname) is str)
1218            self.assertTrue(type(t.gname) is str)
1219        tar.close()
1220
1221    def test_uname_unicode(self):
1222        for name in (u"���", "���"):
1223            t = tarfile.TarInfo("foo")
1224            t.uname = name
1225            t.gname = name
1226
1227            fobj = StringIO.StringIO()
1228            tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
1229            tar.addfile(t)
1230            tar.close()
1231            fobj.seek(0)
1232
1233            tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
1234            t = tar.getmember("foo")
1235            self.assertEqual(t.uname, "���")
1236            self.assertEqual(t.gname, "���")
1237
1238
1239class GNUUnicodeTest(UstarUnicodeTest):
1240
1241    format = tarfile.GNU_FORMAT
1242
1243
1244class PaxUnicodeTest(UstarUnicodeTest):
1245
1246    format = tarfile.PAX_FORMAT
1247
1248    def _create_unicode_name(self, name):
1249        tar = tarfile.open(tmpname, "w", format=self.format)
1250        t = tarfile.TarInfo()
1251        t.pax_headers["path"] = name
1252        tar.addfile(t)
1253        tar.close()
1254
1255    def test_error_handlers(self):
1256        # Test if the unicode error handlers work correctly for characters
1257        # that cannot be expressed in a given encoding.
1258        self._create_unicode_name(u"���")
1259
1260        for handler, name in (("utf-8", u"���".encode("utf8")),
1261                    ("replace", "???"), ("ignore", "")):
1262            tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
1263                    errors=handler)
1264            self.assertEqual(tar.getnames()[0], name)
1265
1266        self.assertRaises(UnicodeError, tarfile.open, tmpname,
1267                encoding="ascii", errors="strict")
1268
1269    def test_error_handler_utf8(self):
1270        # Create a pathname that has one component representable using
1271        # iso8859-1 and the other only in iso8859-15.
1272        self._create_unicode_name(u"���/�")
1273
1274        tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
1275                errors="utf-8")
1276        self.assertEqual(tar.getnames()[0], "���/" + u"�".encode("utf8"))
1277
1278
1279class AppendTest(unittest.TestCase):
1280    # Test append mode (cp. patch #1652681).
1281
1282    def setUp(self):
1283        self.tarname = tmpname
1284        if os.path.exists(self.tarname):
1285            os.remove(self.tarname)
1286
1287    def _add_testfile(self, fileobj=None):
1288        tar = tarfile.open(self.tarname, "a", fileobj=fileobj)
1289        tar.addfile(tarfile.TarInfo("bar"))
1290        tar.close()
1291
1292    def _create_testtar(self, mode="w:"):
1293        src = tarfile.open(tarname, encoding="iso8859-1")
1294        t = src.getmember("ustar/regtype")
1295        t.name = "foo"
1296        f = src.extractfile(t)
1297        tar = tarfile.open(self.tarname, mode)
1298        tar.addfile(t, f)
1299        tar.close()
1300
1301    def _test(self, names=["bar"], fileobj=None):
1302        tar = tarfile.open(self.tarname, fileobj=fileobj)
1303        self.assertEqual(tar.getnames(), names)
1304
1305    def test_non_existing(self):
1306        self._add_testfile()
1307        self._test()
1308
1309    def test_empty(self):
1310        tarfile.open(self.tarname, "w:").close()
1311        self._add_testfile()
1312        self._test()
1313
1314    def test_empty_fileobj(self):
1315        fobj = StringIO.StringIO("\0" * 1024)
1316        self._add_testfile(fobj)
1317        fobj.seek(0)
1318        self._test(fileobj=fobj)
1319
1320    def test_fileobj(self):
1321        self._create_testtar()
1322        data = open(self.tarname).read()
1323        fobj = StringIO.StringIO(data)
1324        self._add_testfile(fobj)
1325        fobj.seek(0)
1326        self._test(names=["foo", "bar"], fileobj=fobj)
1327
1328    def test_existing(self):
1329        self._create_testtar()
1330        self._add_testfile()
1331        self._test(names=["foo", "bar"])
1332
1333    def test_append_gz(self):
1334        if gzip is None:
1335            return
1336        self._create_testtar("w:gz")
1337        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1338
1339    def test_append_bz2(self):
1340        if bz2 is None:
1341            return
1342        self._create_testtar("w:bz2")
1343        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1344
1345    # Append mode is supposed to fail if the tarfile to append to
1346    # does not end with a zero block.
1347    def _test_error(self, data):
1348        open(self.tarname, "wb").write(data)
1349        self.assertRaises(tarfile.ReadError, self._add_testfile)
1350
1351    def test_null(self):
1352        self._test_error("")
1353
1354    def test_incomplete(self):
1355        self._test_error("\0" * 13)
1356
1357    def test_premature_eof(self):
1358        data = tarfile.TarInfo("foo").tobuf()
1359        self._test_error(data)
1360
1361    def test_trailing_garbage(self):
1362        data = tarfile.TarInfo("foo").tobuf()
1363        self._test_error(data + "\0" * 13)
1364
1365    def test_invalid(self):
1366        self._test_error("a" * 512)
1367
1368
1369class LimitsTest(unittest.TestCase):
1370
1371    def test_ustar_limits(self):
1372        # 100 char name
1373        tarinfo = tarfile.TarInfo("0123456789" * 10)
1374        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1375
1376        # 101 char name that cannot be stored
1377        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
1378        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1379
1380        # 256 char name with a slash at pos 156
1381        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
1382        tarinfo.tobuf(tarfile.USTAR_FORMAT)
1383
1384        # 256 char name that cannot be stored
1385        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
1386        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1387
1388        # 512 char name
1389        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1390        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1391
1392        # 512 char linkname
1393        tarinfo = tarfile.TarInfo("longlink")
1394        tarinfo.linkname = "123/" * 126 + "longname"
1395        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1396
1397        # uid > 8 digits
1398        tarinfo = tarfile.TarInfo("name")
1399        tarinfo.uid = 010000000
1400        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
1401
1402    def test_gnu_limits(self):
1403        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1404        tarinfo.tobuf(tarfile.GNU_FORMAT)
1405
1406        tarinfo = tarfile.TarInfo("longlink")
1407        tarinfo.linkname = "123/" * 126 + "longname"
1408        tarinfo.tobuf(tarfile.GNU_FORMAT)
1409
1410        # uid >= 256 ** 7
1411        tarinfo = tarfile.TarInfo("name")
1412        tarinfo.uid = 04000000000000000000L
1413        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
1414
1415    def test_pax_limits(self):
1416        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
1417        tarinfo.tobuf(tarfile.PAX_FORMAT)
1418
1419        tarinfo = tarfile.TarInfo("longlink")
1420        tarinfo.linkname = "123/" * 126 + "longname"
1421        tarinfo.tobuf(tarfile.PAX_FORMAT)
1422
1423        tarinfo = tarfile.TarInfo("name")
1424        tarinfo.uid = 04000000000000000000L
1425        tarinfo.tobuf(tarfile.PAX_FORMAT)
1426
1427
1428class ContextManagerTest(unittest.TestCase):
1429
1430    def test_basic(self):
1431        with tarfile.open(tarname) as tar:
1432            self.assertFalse(tar.closed, "closed inside runtime context")
1433        self.assertTrue(tar.closed, "context manager failed")
1434
1435    def test_closed(self):
1436        # The __enter__() method is supposed to raise IOError
1437        # if the TarFile object is already closed.
1438        tar = tarfile.open(tarname)
1439        tar.close()
1440        with self.assertRaises(IOError):
1441            with tar:
1442                pass
1443
1444    def test_exception(self):
1445        # Test if the IOError exception is passed through properly.
1446        with self.assertRaises(Exception) as exc:
1447            with tarfile.open(tarname) as tar:
1448                raise IOError
1449        self.assertIsInstance(exc.exception, IOError,
1450                              "wrong exception raised in context manager")
1451        self.assertTrue(tar.closed, "context manager failed")
1452
1453    def test_no_eof(self):
1454        # __exit__() must not write end-of-archive blocks if an
1455        # exception was raised.
1456        try:
1457            with tarfile.open(tmpname, "w") as tar:
1458                raise Exception
1459        except:
1460            pass
1461        self.assertEqual(os.path.getsize(tmpname), 0,
1462                "context manager wrote an end-of-archive block")
1463        self.assertTrue(tar.closed, "context manager failed")
1464
1465    def test_eof(self):
1466        # __exit__() must write end-of-archive blocks, i.e. call
1467        # TarFile.close() if there was no error.
1468        with tarfile.open(tmpname, "w"):
1469            pass
1470        self.assertNotEqual(os.path.getsize(tmpname), 0,
1471                "context manager wrote no end-of-archive block")
1472
1473    def test_fileobj(self):
1474        # Test that __exit__() did not close the external file
1475        # object.
1476        fobj = open(tmpname, "wb")
1477        try:
1478            with tarfile.open(fileobj=fobj, mode="w") as tar:
1479                raise Exception
1480        except:
1481            pass
1482        self.assertFalse(fobj.closed, "external file object was closed")
1483        self.assertTrue(tar.closed, "context manager failed")
1484        fobj.close()
1485
1486
1487class LinkEmulationTest(ReadTest):
1488
1489    # Test for issue #8741 regression. On platforms that do not support
1490    # symbolic or hard links tarfile tries to extract these types of members as
1491    # the regular files they point to.
1492    def _test_link_extraction(self, name):
1493        self.tar.extract(name, TEMPDIR)
1494        data = open(os.path.join(TEMPDIR, name), "rb").read()
1495        self.assertEqual(md5sum(data), md5_regtype)
1496
1497    def test_hardlink_extraction1(self):
1498        self._test_link_extraction("ustar/lnktype")
1499
1500    def test_hardlink_extraction2(self):
1501        self._test_link_extraction("./ustar/linktest2/lnktype")
1502
1503    def test_symlink_extraction1(self):
1504        self._test_link_extraction("ustar/symtype")
1505
1506    def test_symlink_extraction2(self):
1507        self._test_link_extraction("./ustar/linktest2/symtype")
1508
1509
1510class GzipMiscReadTest(MiscReadTest):
1511    tarname = gzipname
1512    mode = "r:gz"
1513class GzipUstarReadTest(UstarReadTest):
1514    tarname = gzipname
1515    mode = "r:gz"
1516class GzipStreamReadTest(StreamReadTest):
1517    tarname = gzipname
1518    mode = "r|gz"
1519class GzipWriteTest(WriteTest):
1520    mode = "w:gz"
1521class GzipStreamWriteTest(StreamWriteTest):
1522    mode = "w|gz"
1523
1524
1525class Bz2MiscReadTest(MiscReadTest):
1526    tarname = bz2name
1527    mode = "r:bz2"
1528class Bz2UstarReadTest(UstarReadTest):
1529    tarname = bz2name
1530    mode = "r:bz2"
1531class Bz2StreamReadTest(StreamReadTest):
1532    tarname = bz2name
1533    mode = "r|bz2"
1534class Bz2WriteTest(WriteTest):
1535    mode = "w:bz2"
1536class Bz2StreamWriteTest(StreamWriteTest):
1537    mode = "w|bz2"
1538
1539class Bz2PartialReadTest(unittest.TestCase):
1540    # Issue5068: The _BZ2Proxy.read() method loops forever
1541    # on an empty or partial bzipped file.
1542
1543    def _test_partial_input(self, mode):
1544        class MyStringIO(StringIO.StringIO):
1545            hit_eof = False
1546            def read(self, n):
1547                if self.hit_eof:
1548                    raise AssertionError("infinite loop detected in tarfile.open()")
1549                self.hit_eof = self.pos == self.len
1550                return StringIO.StringIO.read(self, n)
1551            def seek(self, *args):
1552                self.hit_eof = False
1553                return StringIO.StringIO.seek(self, *args)
1554
1555        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
1556        for x in range(len(data) + 1):
1557            try:
1558                tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
1559            except tarfile.ReadError:
1560                pass # we have no interest in ReadErrors
1561
1562    def test_partial_input(self):
1563        self._test_partial_input("r")
1564
1565    def test_partial_input_bz2(self):
1566        self._test_partial_input("r:bz2")
1567
1568
1569def test_main():
1570    os.makedirs(TEMPDIR)
1571
1572    tests = [
1573        UstarReadTest,
1574        MiscReadTest,
1575        StreamReadTest,
1576        DetectReadTest,
1577        MemberReadTest,
1578        GNUReadTest,
1579        PaxReadTest,
1580        WriteTest,
1581        StreamWriteTest,
1582        GNUWriteTest,
1583        PaxWriteTest,
1584        UstarUnicodeTest,
1585        GNUUnicodeTest,
1586        PaxUnicodeTest,
1587        AppendTest,
1588        LimitsTest,
1589        ContextManagerTest,
1590    ]
1591
1592    if hasattr(os, "link"):
1593        tests.append(HardlinkTest)
1594    else:
1595        tests.append(LinkEmulationTest)
1596
1597    fobj = open(tarname, "rb")
1598    data = fobj.read()
1599    fobj.close()
1600
1601    if gzip:
1602        # Create testtar.tar.gz and add gzip-specific tests.
1603        tar = gzip.open(gzipname, "wb")
1604        tar.write(data)
1605        tar.close()
1606
1607        tests += [
1608            GzipMiscReadTest,
1609            GzipUstarReadTest,
1610            GzipStreamReadTest,
1611            GzipWriteTest,
1612            GzipStreamWriteTest,
1613        ]
1614
1615    if bz2:
1616        # Create testtar.tar.bz2 and add bz2-specific tests.
1617        tar = bz2.BZ2File(bz2name, "wb")
1618        tar.write(data)
1619        tar.close()
1620
1621        tests += [
1622            Bz2MiscReadTest,
1623            Bz2UstarReadTest,
1624            Bz2StreamReadTest,
1625            Bz2WriteTest,
1626            Bz2StreamWriteTest,
1627            Bz2PartialReadTest,
1628        ]
1629
1630    try:
1631        test_support.run_unittest(*tests)
1632    finally:
1633        if os.path.exists(TEMPDIR):
1634            shutil.rmtree(TEMPDIR)
1635
1636if __name__ == "__main__":
1637    test_main()
1638