1# -*- coding: iso-8859-15 -*- 2 3import sys 4import os 5import shutil 6import StringIO 7from hashlib import md5 8import errno 9 10import unittest 11import tarfile 12 13from test import test_support 14 15# Check for our compression modules. 16try: 17 import gzip 18 gzip.GzipFile 19except (ImportError, AttributeError): 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25 26def md5sum(data): 27 return md5(data).hexdigest() 28 29TEMPDIR = os.path.abspath(test_support.TESTFN) 30tarname = test_support.findfile("testtar.tar") 31gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 32bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 33tmpname = os.path.join(TEMPDIR, "tmp.tar") 34 35md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 36md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 37 38 39class ReadTest(unittest.TestCase): 40 41 tarname = tarname 42 mode = "r:" 43 44 def setUp(self): 45 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 46 47 def tearDown(self): 48 self.tar.close() 49 50 51class UstarReadTest(ReadTest): 52 53 def test_fileobj_regular_file(self): 54 tarinfo = self.tar.getmember("ustar/regtype") 55 fobj = self.tar.extractfile(tarinfo) 56 data = fobj.read() 57 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 58 "regular file extraction failed") 59 60 def test_fileobj_readlines(self): 61 self.tar.extract("ustar/regtype", TEMPDIR) 62 tarinfo = self.tar.getmember("ustar/regtype") 63 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 64 fobj2 = self.tar.extractfile(tarinfo) 65 66 lines1 = fobj1.readlines() 67 lines2 = fobj2.readlines() 68 self.assertTrue(lines1 == lines2, 69 "fileobj.readlines() failed") 70 self.assertTrue(len(lines2) == 114, 71 "fileobj.readlines() failed") 72 self.assertTrue(lines2[83] == 73 "I will gladly admit that Python is not the fastest running scripting language.\n", 74 "fileobj.readlines() failed") 75 76 def test_fileobj_iter(self): 77 self.tar.extract("ustar/regtype", TEMPDIR) 78 tarinfo = self.tar.getmember("ustar/regtype") 79 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 80 fobj2 = self.tar.extractfile(tarinfo) 81 lines1 = fobj1.readlines() 82 lines2 = [line for line in fobj2] 83 self.assertTrue(lines1 == lines2, 84 "fileobj.__iter__() failed") 85 86 def test_fileobj_seek(self): 87 self.tar.extract("ustar/regtype", TEMPDIR) 88 fobj = open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") 89 data = fobj.read() 90 fobj.close() 91 92 tarinfo = self.tar.getmember("ustar/regtype") 93 fobj = self.tar.extractfile(tarinfo) 94 95 text = fobj.read() 96 fobj.seek(0) 97 self.assertTrue(0 == fobj.tell(), 98 "seek() to file's start failed") 99 fobj.seek(2048, 0) 100 self.assertTrue(2048 == fobj.tell(), 101 "seek() to absolute position failed") 102 fobj.seek(-1024, 1) 103 self.assertTrue(1024 == fobj.tell(), 104 "seek() to negative relative position failed") 105 fobj.seek(1024, 1) 106 self.assertTrue(2048 == fobj.tell(), 107 "seek() to positive relative position failed") 108 s = fobj.read(10) 109 self.assertTrue(s == data[2048:2058], 110 "read() after seek failed") 111 fobj.seek(0, 2) 112 self.assertTrue(tarinfo.size == fobj.tell(), 113 "seek() to file's end failed") 114 self.assertTrue(fobj.read() == "", 115 "read() at file's end did not return empty string") 116 fobj.seek(-tarinfo.size, 2) 117 self.assertTrue(0 == fobj.tell(), 118 "relative seek() to file's start failed") 119 fobj.seek(512) 120 s1 = fobj.readlines() 121 fobj.seek(512) 122 s2 = fobj.readlines() 123 self.assertTrue(s1 == s2, 124 "readlines() after seek failed") 125 fobj.seek(0) 126 self.assertTrue(len(fobj.readline()) == fobj.tell(), 127 "tell() after readline() failed") 128 fobj.seek(512) 129 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 130 "tell() after seek() and readline() failed") 131 fobj.seek(0) 132 line = fobj.readline() 133 self.assertTrue(fobj.read() == data[len(line):], 134 "read() after readline() failed") 135 fobj.close() 136 137 # Test if symbolic and hard links are resolved by extractfile(). The 138 # test link members each point to a regular member whose data is 139 # supposed to be exported. 140 def _test_fileobj_link(self, lnktype, regtype): 141 a = self.tar.extractfile(lnktype) 142 b = self.tar.extractfile(regtype) 143 self.assertEqual(a.name, b.name) 144 145 def test_fileobj_link1(self): 146 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 147 148 def test_fileobj_link2(self): 149 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 150 151 def test_fileobj_symlink1(self): 152 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 153 154 def test_fileobj_symlink2(self): 155 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 156 157 158class CommonReadTest(ReadTest): 159 160 def test_empty_tarfile(self): 161 # Test for issue6123: Allow opening empty archives. 162 # This test checks if tarfile.open() is able to open an empty tar 163 # archive successfully. Note that an empty tar archive is not the 164 # same as an empty file! 165 tarfile.open(tmpname, self.mode.replace("r", "w")).close() 166 try: 167 tar = tarfile.open(tmpname, self.mode) 168 tar.getnames() 169 except tarfile.ReadError: 170 self.fail("tarfile.open() failed on empty archive") 171 self.assertListEqual(tar.getmembers(), []) 172 173 def test_null_tarfile(self): 174 # Test for issue6123: Allow opening empty archives. 175 # This test guarantees that tarfile.open() does not treat an empty 176 # file as an empty tar archive. 177 open(tmpname, "wb").close() 178 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 179 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 180 181 def test_ignore_zeros(self): 182 # Test TarFile's ignore_zeros option. 183 if self.mode.endswith(":gz"): 184 _open = gzip.GzipFile 185 elif self.mode.endswith(":bz2"): 186 _open = bz2.BZ2File 187 else: 188 _open = open 189 190 for char in ('\0', 'a'): 191 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 192 # are ignored correctly. 193 fobj = _open(tmpname, "wb") 194 fobj.write(char * 1024) 195 fobj.write(tarfile.TarInfo("foo").tobuf()) 196 fobj.close() 197 198 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 199 self.assertListEqual(tar.getnames(), ["foo"], 200 "ignore_zeros=True should have skipped the %r-blocks" % char) 201 tar.close() 202 203 204class MiscReadTest(CommonReadTest): 205 206 def test_no_name_argument(self): 207 fobj = open(self.tarname, "rb") 208 tar = tarfile.open(fileobj=fobj, mode=self.mode) 209 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 210 211 def test_no_name_attribute(self): 212 data = open(self.tarname, "rb").read() 213 fobj = StringIO.StringIO(data) 214 self.assertRaises(AttributeError, getattr, fobj, "name") 215 tar = tarfile.open(fileobj=fobj, mode=self.mode) 216 self.assertEqual(tar.name, None) 217 218 def test_empty_name_attribute(self): 219 data = open(self.tarname, "rb").read() 220 fobj = StringIO.StringIO(data) 221 fobj.name = "" 222 tar = tarfile.open(fileobj=fobj, mode=self.mode) 223 self.assertEqual(tar.name, None) 224 225 def test_fileobj_with_offset(self): 226 # Skip the first member and store values from the second member 227 # of the testtar. 228 tar = tarfile.open(self.tarname, mode=self.mode) 229 tar.next() 230 t = tar.next() 231 name = t.name 232 offset = t.offset 233 data = tar.extractfile(t).read() 234 tar.close() 235 236 # Open the testtar and seek to the offset of the second member. 237 if self.mode.endswith(":gz"): 238 _open = gzip.GzipFile 239 elif self.mode.endswith(":bz2"): 240 _open = bz2.BZ2File 241 else: 242 _open = open 243 fobj = _open(self.tarname, "rb") 244 fobj.seek(offset) 245 246 # Test if the tarfile starts with the second member. 247 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 248 t = tar.next() 249 self.assertEqual(t.name, name) 250 # Read to the end of fileobj and test if seeking back to the 251 # beginning works. 252 tar.getmembers() 253 self.assertEqual(tar.extractfile(t).read(), data, 254 "seek back did not work") 255 tar.close() 256 257 def test_fail_comp(self): 258 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 259 if self.mode == "r:": 260 return 261 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 262 fobj = open(tarname, "rb") 263 self.assertRaises(tarfile.ReadError, tarfile.open, fileobj=fobj, mode=self.mode) 264 265 def test_v7_dirtype(self): 266 # Test old style dirtype member (bug #1336623): 267 # Old V7 tars create directory members using an AREGTYPE 268 # header with a "/" appended to the filename field. 269 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 270 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 271 "v7 dirtype failed") 272 273 def test_xstar_type(self): 274 # The xstar format stores extra atime and ctime fields inside the 275 # space reserved for the prefix field. The prefix field must be 276 # ignored in this case, otherwise it will mess up the name. 277 try: 278 self.tar.getmember("misc/regtype-xstar") 279 except KeyError: 280 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 281 282 def test_check_members(self): 283 for tarinfo in self.tar: 284 self.assertTrue(int(tarinfo.mtime) == 07606136617, 285 "wrong mtime for %s" % tarinfo.name) 286 if not tarinfo.name.startswith("ustar/"): 287 continue 288 self.assertTrue(tarinfo.uname == "tarfile", 289 "wrong uname for %s" % tarinfo.name) 290 291 def test_find_members(self): 292 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 293 "could not find all members") 294 295 def test_extract_hardlink(self): 296 # Test hardlink extraction (e.g. bug #857297). 297 tar = tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") 298 299 tar.extract("ustar/regtype", TEMPDIR) 300 try: 301 tar.extract("ustar/lnktype", TEMPDIR) 302 except EnvironmentError, e: 303 if e.errno == errno.ENOENT: 304 self.fail("hardlink not extracted properly") 305 306 data = open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb").read() 307 self.assertEqual(md5sum(data), md5_regtype) 308 309 try: 310 tar.extract("ustar/symtype", TEMPDIR) 311 except EnvironmentError, e: 312 if e.errno == errno.ENOENT: 313 self.fail("symlink not extracted properly") 314 315 data = open(os.path.join(TEMPDIR, "ustar/symtype"), "rb").read() 316 self.assertEqual(md5sum(data), md5_regtype) 317 318 def test_extractall(self): 319 # Test if extractall() correctly restores directory permissions 320 # and times (see issue1735). 321 tar = tarfile.open(tarname, encoding="iso8859-1") 322 directories = [t for t in tar if t.isdir()] 323 tar.extractall(TEMPDIR, directories) 324 for tarinfo in directories: 325 path = os.path.join(TEMPDIR, tarinfo.name) 326 if sys.platform != "win32": 327 # Win32 has no support for fine grained permissions. 328 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777) 329 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 330 tar.close() 331 332 def test_init_close_fobj(self): 333 # Issue #7341: Close the internal file object in the TarFile 334 # constructor in case of an error. For the test we rely on 335 # the fact that opening an empty file raises a ReadError. 336 empty = os.path.join(TEMPDIR, "empty") 337 open(empty, "wb").write("") 338 339 try: 340 tar = object.__new__(tarfile.TarFile) 341 try: 342 tar.__init__(empty) 343 except tarfile.ReadError: 344 self.assertTrue(tar.fileobj.closed) 345 else: 346 self.fail("ReadError not raised") 347 finally: 348 os.remove(empty) 349 350 351class StreamReadTest(CommonReadTest): 352 353 mode="r|" 354 355 def test_fileobj_regular_file(self): 356 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 357 fobj = self.tar.extractfile(tarinfo) 358 data = fobj.read() 359 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 360 "regular file extraction failed") 361 362 def test_provoke_stream_error(self): 363 tarinfos = self.tar.getmembers() 364 f = self.tar.extractfile(tarinfos[0]) # read the first member 365 self.assertRaises(tarfile.StreamError, f.read) 366 367 def test_compare_members(self): 368 tar1 = tarfile.open(tarname, encoding="iso8859-1") 369 tar2 = self.tar 370 371 while True: 372 t1 = tar1.next() 373 t2 = tar2.next() 374 if t1 is None: 375 break 376 self.assertTrue(t2 is not None, "stream.next() failed.") 377 378 if t2.islnk() or t2.issym(): 379 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 380 continue 381 382 v1 = tar1.extractfile(t1) 383 v2 = tar2.extractfile(t2) 384 if v1 is None: 385 continue 386 self.assertTrue(v2 is not None, "stream.extractfile() failed") 387 self.assertTrue(v1.read() == v2.read(), "stream extraction failed") 388 389 tar1.close() 390 391 392class DetectReadTest(unittest.TestCase): 393 394 def _testfunc_file(self, name, mode): 395 try: 396 tarfile.open(name, mode) 397 except tarfile.ReadError: 398 self.fail() 399 400 def _testfunc_fileobj(self, name, mode): 401 try: 402 tarfile.open(name, mode, fileobj=open(name, "rb")) 403 except tarfile.ReadError: 404 self.fail() 405 406 def _test_modes(self, testfunc): 407 testfunc(tarname, "r") 408 testfunc(tarname, "r:") 409 testfunc(tarname, "r:*") 410 testfunc(tarname, "r|") 411 testfunc(tarname, "r|*") 412 413 if gzip: 414 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 415 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 416 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 417 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 418 419 testfunc(gzipname, "r") 420 testfunc(gzipname, "r:*") 421 testfunc(gzipname, "r:gz") 422 testfunc(gzipname, "r|*") 423 testfunc(gzipname, "r|gz") 424 425 if bz2: 426 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 427 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 428 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 429 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 430 431 testfunc(bz2name, "r") 432 testfunc(bz2name, "r:*") 433 testfunc(bz2name, "r:bz2") 434 testfunc(bz2name, "r|*") 435 testfunc(bz2name, "r|bz2") 436 437 def test_detect_file(self): 438 self._test_modes(self._testfunc_file) 439 440 def test_detect_fileobj(self): 441 self._test_modes(self._testfunc_fileobj) 442 443 444class MemberReadTest(ReadTest): 445 446 def _test_member(self, tarinfo, chksum=None, **kwargs): 447 if chksum is not None: 448 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 449 "wrong md5sum for %s" % tarinfo.name) 450 451 kwargs["mtime"] = 07606136617 452 kwargs["uid"] = 1000 453 kwargs["gid"] = 100 454 if "old-v7" not in tarinfo.name: 455 # V7 tar can't handle alphabetic owners. 456 kwargs["uname"] = "tarfile" 457 kwargs["gname"] = "tarfile" 458 for k, v in kwargs.iteritems(): 459 self.assertTrue(getattr(tarinfo, k) == v, 460 "wrong value in %s field of %s" % (k, tarinfo.name)) 461 462 def test_find_regtype(self): 463 tarinfo = self.tar.getmember("ustar/regtype") 464 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 465 466 def test_find_conttype(self): 467 tarinfo = self.tar.getmember("ustar/conttype") 468 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 469 470 def test_find_dirtype(self): 471 tarinfo = self.tar.getmember("ustar/dirtype") 472 self._test_member(tarinfo, size=0) 473 474 def test_find_dirtype_with_size(self): 475 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 476 self._test_member(tarinfo, size=255) 477 478 def test_find_lnktype(self): 479 tarinfo = self.tar.getmember("ustar/lnktype") 480 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 481 482 def test_find_symtype(self): 483 tarinfo = self.tar.getmember("ustar/symtype") 484 self._test_member(tarinfo, size=0, linkname="regtype") 485 486 def test_find_blktype(self): 487 tarinfo = self.tar.getmember("ustar/blktype") 488 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 489 490 def test_find_chrtype(self): 491 tarinfo = self.tar.getmember("ustar/chrtype") 492 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 493 494 def test_find_fifotype(self): 495 tarinfo = self.tar.getmember("ustar/fifotype") 496 self._test_member(tarinfo, size=0) 497 498 def test_find_sparse(self): 499 tarinfo = self.tar.getmember("ustar/sparse") 500 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 501 502 def test_find_umlauts(self): 503 tarinfo = self.tar.getmember("ustar/umlauts-�������") 504 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 505 506 def test_find_ustar_longname(self): 507 name = "ustar/" + "12345/" * 39 + "1234567/longname" 508 self.assertIn(name, self.tar.getnames()) 509 510 def test_find_regtype_oldv7(self): 511 tarinfo = self.tar.getmember("misc/regtype-old-v7") 512 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 513 514 def test_find_pax_umlauts(self): 515 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 516 tarinfo = self.tar.getmember("pax/umlauts-�������") 517 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 518 519 520class LongnameTest(ReadTest): 521 522 def test_read_longname(self): 523 # Test reading of longname (bug #1471427). 524 longname = self.subdir + "/" + "123/" * 125 + "longname" 525 try: 526 tarinfo = self.tar.getmember(longname) 527 except KeyError: 528 self.fail("longname not found") 529 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 530 531 def test_read_longlink(self): 532 longname = self.subdir + "/" + "123/" * 125 + "longname" 533 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 534 try: 535 tarinfo = self.tar.getmember(longlink) 536 except KeyError: 537 self.fail("longlink not found") 538 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 539 540 def test_truncated_longname(self): 541 longname = self.subdir + "/" + "123/" * 125 + "longname" 542 tarinfo = self.tar.getmember(longname) 543 offset = tarinfo.offset 544 self.tar.fileobj.seek(offset) 545 fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512)) 546 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 547 548 def test_header_offset(self): 549 # Test if the start offset of the TarInfo object includes 550 # the preceding extended header. 551 longname = self.subdir + "/" + "123/" * 125 + "longname" 552 offset = self.tar.getmember(longname).offset 553 fobj = open(tarname) 554 fobj.seek(offset) 555 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512)) 556 self.assertEqual(tarinfo.type, self.longnametype) 557 558 559class GNUReadTest(LongnameTest): 560 561 subdir = "gnu" 562 longnametype = tarfile.GNUTYPE_LONGNAME 563 564 def test_sparse_file(self): 565 tarinfo1 = self.tar.getmember("ustar/sparse") 566 fobj1 = self.tar.extractfile(tarinfo1) 567 tarinfo2 = self.tar.getmember("gnu/sparse") 568 fobj2 = self.tar.extractfile(tarinfo2) 569 self.assertTrue(fobj1.read() == fobj2.read(), 570 "sparse file extraction failed") 571 572 573class PaxReadTest(LongnameTest): 574 575 subdir = "pax" 576 longnametype = tarfile.XHDTYPE 577 578 def test_pax_global_headers(self): 579 tar = tarfile.open(tarname, encoding="iso8859-1") 580 581 tarinfo = tar.getmember("pax/regtype1") 582 self.assertEqual(tarinfo.uname, "foo") 583 self.assertEqual(tarinfo.gname, "bar") 584 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 585 586 tarinfo = tar.getmember("pax/regtype2") 587 self.assertEqual(tarinfo.uname, "") 588 self.assertEqual(tarinfo.gname, "bar") 589 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 590 591 tarinfo = tar.getmember("pax/regtype3") 592 self.assertEqual(tarinfo.uname, "tarfile") 593 self.assertEqual(tarinfo.gname, "tarfile") 594 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"�������") 595 596 def test_pax_number_fields(self): 597 # All following number fields are read from the pax header. 598 tar = tarfile.open(tarname, encoding="iso8859-1") 599 tarinfo = tar.getmember("pax/regtype4") 600 self.assertEqual(tarinfo.size, 7011) 601 self.assertEqual(tarinfo.uid, 123) 602 self.assertEqual(tarinfo.gid, 123) 603 self.assertEqual(tarinfo.mtime, 1041808783.0) 604 self.assertEqual(type(tarinfo.mtime), float) 605 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 606 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 607 608 609class WriteTestBase(unittest.TestCase): 610 # Put all write tests in here that are supposed to be tested 611 # in all possible mode combinations. 612 613 def test_fileobj_no_close(self): 614 fobj = StringIO.StringIO() 615 tar = tarfile.open(fileobj=fobj, mode=self.mode) 616 tar.addfile(tarfile.TarInfo("foo")) 617 tar.close() 618 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 619 620 621class WriteTest(WriteTestBase): 622 623 mode = "w:" 624 625 def test_100_char_name(self): 626 # The name field in a tar header stores strings of at most 100 chars. 627 # If a string is shorter than 100 chars it has to be padded with '\0', 628 # which implies that a string of exactly 100 chars is stored without 629 # a trailing '\0'. 630 name = "0123456789" * 10 631 tar = tarfile.open(tmpname, self.mode) 632 t = tarfile.TarInfo(name) 633 tar.addfile(t) 634 tar.close() 635 636 tar = tarfile.open(tmpname) 637 self.assertTrue(tar.getnames()[0] == name, 638 "failed to store 100 char filename") 639 tar.close() 640 641 def test_tar_size(self): 642 # Test for bug #1013882. 643 tar = tarfile.open(tmpname, self.mode) 644 path = os.path.join(TEMPDIR, "file") 645 fobj = open(path, "wb") 646 fobj.write("aaa") 647 fobj.close() 648 tar.add(path) 649 tar.close() 650 self.assertTrue(os.path.getsize(tmpname) > 0, 651 "tarfile is empty") 652 653 # The test_*_size tests test for bug #1167128. 654 def test_file_size(self): 655 tar = tarfile.open(tmpname, self.mode) 656 657 path = os.path.join(TEMPDIR, "file") 658 fobj = open(path, "wb") 659 fobj.close() 660 tarinfo = tar.gettarinfo(path) 661 self.assertEqual(tarinfo.size, 0) 662 663 fobj = open(path, "wb") 664 fobj.write("aaa") 665 fobj.close() 666 tarinfo = tar.gettarinfo(path) 667 self.assertEqual(tarinfo.size, 3) 668 669 tar.close() 670 671 def test_directory_size(self): 672 path = os.path.join(TEMPDIR, "directory") 673 os.mkdir(path) 674 try: 675 tar = tarfile.open(tmpname, self.mode) 676 tarinfo = tar.gettarinfo(path) 677 self.assertEqual(tarinfo.size, 0) 678 finally: 679 os.rmdir(path) 680 681 def test_link_size(self): 682 if hasattr(os, "link"): 683 link = os.path.join(TEMPDIR, "link") 684 target = os.path.join(TEMPDIR, "link_target") 685 fobj = open(target, "wb") 686 fobj.write("aaa") 687 fobj.close() 688 os.link(target, link) 689 try: 690 tar = tarfile.open(tmpname, self.mode) 691 # Record the link target in the inodes list. 692 tar.gettarinfo(target) 693 tarinfo = tar.gettarinfo(link) 694 self.assertEqual(tarinfo.size, 0) 695 finally: 696 os.remove(target) 697 os.remove(link) 698 699 def test_symlink_size(self): 700 if hasattr(os, "symlink"): 701 path = os.path.join(TEMPDIR, "symlink") 702 os.symlink("link_target", path) 703 try: 704 tar = tarfile.open(tmpname, self.mode) 705 tarinfo = tar.gettarinfo(path) 706 self.assertEqual(tarinfo.size, 0) 707 finally: 708 os.remove(path) 709 710 def test_add_self(self): 711 # Test for #1257255. 712 dstname = os.path.abspath(tmpname) 713 714 tar = tarfile.open(tmpname, self.mode) 715 self.assertTrue(tar.name == dstname, "archive name must be absolute") 716 717 tar.add(dstname) 718 self.assertTrue(tar.getnames() == [], "added the archive to itself") 719 720 cwd = os.getcwd() 721 os.chdir(TEMPDIR) 722 tar.add(dstname) 723 os.chdir(cwd) 724 self.assertTrue(tar.getnames() == [], "added the archive to itself") 725 726 def test_exclude(self): 727 tempdir = os.path.join(TEMPDIR, "exclude") 728 os.mkdir(tempdir) 729 try: 730 for name in ("foo", "bar", "baz"): 731 name = os.path.join(tempdir, name) 732 open(name, "wb").close() 733 734 exclude = os.path.isfile 735 736 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 737 with test_support.check_warnings(("use the filter argument", 738 DeprecationWarning)): 739 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 740 tar.close() 741 742 tar = tarfile.open(tmpname, "r") 743 self.assertEqual(len(tar.getmembers()), 1) 744 self.assertEqual(tar.getnames()[0], "empty_dir") 745 finally: 746 shutil.rmtree(tempdir) 747 748 def test_filter(self): 749 tempdir = os.path.join(TEMPDIR, "filter") 750 os.mkdir(tempdir) 751 try: 752 for name in ("foo", "bar", "baz"): 753 name = os.path.join(tempdir, name) 754 open(name, "wb").close() 755 756 def filter(tarinfo): 757 if os.path.basename(tarinfo.name) == "bar": 758 return 759 tarinfo.uid = 123 760 tarinfo.uname = "foo" 761 return tarinfo 762 763 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 764 tar.add(tempdir, arcname="empty_dir", filter=filter) 765 tar.close() 766 767 tar = tarfile.open(tmpname, "r") 768 for tarinfo in tar: 769 self.assertEqual(tarinfo.uid, 123) 770 self.assertEqual(tarinfo.uname, "foo") 771 self.assertEqual(len(tar.getmembers()), 3) 772 tar.close() 773 finally: 774 shutil.rmtree(tempdir) 775 776 # Guarantee that stored pathnames are not modified. Don't 777 # remove ./ or ../ or double slashes. Still make absolute 778 # pathnames relative. 779 # For details see bug #6054. 780 def _test_pathname(self, path, cmp_path=None, dir=False): 781 # Create a tarfile with an empty member named path 782 # and compare the stored name with the original. 783 foo = os.path.join(TEMPDIR, "foo") 784 if not dir: 785 open(foo, "w").close() 786 else: 787 os.mkdir(foo) 788 789 tar = tarfile.open(tmpname, self.mode) 790 tar.add(foo, arcname=path) 791 tar.close() 792 793 tar = tarfile.open(tmpname, "r") 794 t = tar.next() 795 tar.close() 796 797 if not dir: 798 os.remove(foo) 799 else: 800 os.rmdir(foo) 801 802 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 803 804 def test_pathnames(self): 805 self._test_pathname("foo") 806 self._test_pathname(os.path.join("foo", ".", "bar")) 807 self._test_pathname(os.path.join("foo", "..", "bar")) 808 self._test_pathname(os.path.join(".", "foo")) 809 self._test_pathname(os.path.join(".", "foo", ".")) 810 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 811 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 812 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 813 self._test_pathname(os.path.join("..", "foo")) 814 self._test_pathname(os.path.join("..", "foo", "..")) 815 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 816 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 817 818 self._test_pathname("foo" + os.sep + os.sep + "bar") 819 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 820 821 def test_abs_pathnames(self): 822 if sys.platform == "win32": 823 self._test_pathname("C:\\foo", "foo") 824 else: 825 self._test_pathname("/foo", "foo") 826 self._test_pathname("///foo", "foo") 827 828 def test_cwd(self): 829 # Test adding the current working directory. 830 cwd = os.getcwd() 831 os.chdir(TEMPDIR) 832 try: 833 open("foo", "w").close() 834 835 tar = tarfile.open(tmpname, self.mode) 836 tar.add(".") 837 tar.close() 838 839 tar = tarfile.open(tmpname, "r") 840 for t in tar: 841 self.assert_(t.name == "." or t.name.startswith("./")) 842 tar.close() 843 finally: 844 os.chdir(cwd) 845 846 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 847 def test_extractall_symlinks(self): 848 # Test if extractall works properly when tarfile contains symlinks 849 tempdir = os.path.join(TEMPDIR, "testsymlinks") 850 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 851 os.mkdir(tempdir) 852 try: 853 source_file = os.path.join(tempdir,'source') 854 target_file = os.path.join(tempdir,'symlink') 855 with open(source_file,'w') as f: 856 f.write('something\n') 857 os.symlink(source_file, target_file) 858 tar = tarfile.open(temparchive,'w') 859 tar.add(source_file, arcname=os.path.basename(source_file)) 860 tar.add(target_file, arcname=os.path.basename(target_file)) 861 tar.close() 862 # Let's extract it to the location which contains the symlink 863 tar = tarfile.open(temparchive,'r') 864 # this should not raise OSError: [Errno 17] File exists 865 try: 866 tar.extractall(path=tempdir) 867 except OSError: 868 self.fail("extractall failed with symlinked files") 869 finally: 870 tar.close() 871 finally: 872 os.unlink(temparchive) 873 shutil.rmtree(tempdir) 874 875 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 876 def test_extractall_broken_symlinks(self): 877 # Test if extractall works properly when tarfile contains broken 878 # symlinks 879 tempdir = os.path.join(TEMPDIR, "testsymlinks") 880 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 881 os.mkdir(tempdir) 882 try: 883 source_file = os.path.join(tempdir,'source') 884 target_file = os.path.join(tempdir,'symlink') 885 with open(source_file,'w') as f: 886 f.write('something\n') 887 os.symlink(source_file, target_file) 888 tar = tarfile.open(temparchive,'w') 889 tar.add(target_file, arcname=os.path.basename(target_file)) 890 tar.close() 891 # remove the real file 892 os.unlink(source_file) 893 # Let's extract it to the location which contains the symlink 894 tar = tarfile.open(temparchive,'r') 895 # this should not raise OSError: [Errno 17] File exists 896 try: 897 tar.extractall(path=tempdir) 898 except OSError: 899 self.fail("extractall failed with broken symlinked files") 900 finally: 901 tar.close() 902 finally: 903 os.unlink(temparchive) 904 shutil.rmtree(tempdir) 905 906 @unittest.skipUnless(hasattr(os, 'link'), "needs os.link") 907 def test_extractall_hardlinks(self): 908 # Test if extractall works properly when tarfile contains symlinks 909 tempdir = os.path.join(TEMPDIR, "testsymlinks") 910 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 911 os.mkdir(tempdir) 912 try: 913 source_file = os.path.join(tempdir,'source') 914 target_file = os.path.join(tempdir,'symlink') 915 with open(source_file,'w') as f: 916 f.write('something\n') 917 os.link(source_file, target_file) 918 tar = tarfile.open(temparchive,'w') 919 tar.add(source_file, arcname=os.path.basename(source_file)) 920 tar.add(target_file, arcname=os.path.basename(target_file)) 921 tar.close() 922 # Let's extract it to the location which contains the symlink 923 tar = tarfile.open(temparchive,'r') 924 # this should not raise OSError: [Errno 17] File exists 925 try: 926 tar.extractall(path=tempdir) 927 except OSError: 928 self.fail("extractall failed with linked files") 929 finally: 930 tar.close() 931 finally: 932 os.unlink(temparchive) 933 shutil.rmtree(tempdir) 934 935class StreamWriteTest(WriteTestBase): 936 937 mode = "w|" 938 939 def test_stream_padding(self): 940 # Test for bug #1543303. 941 tar = tarfile.open(tmpname, self.mode) 942 tar.close() 943 944 if self.mode.endswith("gz"): 945 fobj = gzip.GzipFile(tmpname) 946 data = fobj.read() 947 fobj.close() 948 elif self.mode.endswith("bz2"): 949 dec = bz2.BZ2Decompressor() 950 data = open(tmpname, "rb").read() 951 data = dec.decompress(data) 952 self.assertTrue(len(dec.unused_data) == 0, 953 "found trailing data") 954 else: 955 fobj = open(tmpname, "rb") 956 data = fobj.read() 957 fobj.close() 958 959 self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, 960 "incorrect zero padding") 961 962 def test_file_mode(self): 963 # Test for issue #8464: Create files with correct 964 # permissions. 965 if sys.platform == "win32" or not hasattr(os, "umask"): 966 return 967 968 if os.path.exists(tmpname): 969 os.remove(tmpname) 970 971 original_umask = os.umask(0022) 972 try: 973 tar = tarfile.open(tmpname, self.mode) 974 tar.close() 975 mode = os.stat(tmpname).st_mode & 0777 976 self.assertEqual(mode, 0644, "wrong file permissions") 977 finally: 978 os.umask(original_umask) 979 980 981class GNUWriteTest(unittest.TestCase): 982 # This testcase checks for correct creation of GNU Longname 983 # and Longlink extended headers (cp. bug #812325). 984 985 def _length(self, s): 986 blocks, remainder = divmod(len(s) + 1, 512) 987 if remainder: 988 blocks += 1 989 return blocks * 512 990 991 def _calc_size(self, name, link=None): 992 # Initial tar header 993 count = 512 994 995 if len(name) > tarfile.LENGTH_NAME: 996 # GNU longname extended header + longname 997 count += 512 998 count += self._length(name) 999 if link is not None and len(link) > tarfile.LENGTH_LINK: 1000 # GNU longlink extended header + longlink 1001 count += 512 1002 count += self._length(link) 1003 return count 1004 1005 def _test(self, name, link=None): 1006 tarinfo = tarfile.TarInfo(name) 1007 if link: 1008 tarinfo.linkname = link 1009 tarinfo.type = tarfile.LNKTYPE 1010 1011 tar = tarfile.open(tmpname, "w") 1012 tar.format = tarfile.GNU_FORMAT 1013 tar.addfile(tarinfo) 1014 1015 v1 = self._calc_size(name, link) 1016 v2 = tar.offset 1017 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 1018 1019 tar.close() 1020 1021 tar = tarfile.open(tmpname) 1022 member = tar.next() 1023 self.assertIsNotNone(member, 1024 "unable to read longname member") 1025 self.assertEqual(tarinfo.name, member.name, 1026 "unable to read longname member") 1027 self.assertEqual(tarinfo.linkname, member.linkname, 1028 "unable to read longname member") 1029 1030 def test_longname_1023(self): 1031 self._test(("longnam/" * 127) + "longnam") 1032 1033 def test_longname_1024(self): 1034 self._test(("longnam/" * 127) + "longname") 1035 1036 def test_longname_1025(self): 1037 self._test(("longnam/" * 127) + "longname_") 1038 1039 def test_longlink_1023(self): 1040 self._test("name", ("longlnk/" * 127) + "longlnk") 1041 1042 def test_longlink_1024(self): 1043 self._test("name", ("longlnk/" * 127) + "longlink") 1044 1045 def test_longlink_1025(self): 1046 self._test("name", ("longlnk/" * 127) + "longlink_") 1047 1048 def test_longnamelink_1023(self): 1049 self._test(("longnam/" * 127) + "longnam", 1050 ("longlnk/" * 127) + "longlnk") 1051 1052 def test_longnamelink_1024(self): 1053 self._test(("longnam/" * 127) + "longname", 1054 ("longlnk/" * 127) + "longlink") 1055 1056 def test_longnamelink_1025(self): 1057 self._test(("longnam/" * 127) + "longname_", 1058 ("longlnk/" * 127) + "longlink_") 1059 1060 1061class HardlinkTest(unittest.TestCase): 1062 # Test the creation of LNKTYPE (hardlink) members in an archive. 1063 1064 def setUp(self): 1065 self.foo = os.path.join(TEMPDIR, "foo") 1066 self.bar = os.path.join(TEMPDIR, "bar") 1067 1068 fobj = open(self.foo, "wb") 1069 fobj.write("foo") 1070 fobj.close() 1071 1072 os.link(self.foo, self.bar) 1073 1074 self.tar = tarfile.open(tmpname, "w") 1075 self.tar.add(self.foo) 1076 1077 def tearDown(self): 1078 self.tar.close() 1079 os.remove(self.foo) 1080 os.remove(self.bar) 1081 1082 def test_add_twice(self): 1083 # The same name will be added as a REGTYPE every 1084 # time regardless of st_nlink. 1085 tarinfo = self.tar.gettarinfo(self.foo) 1086 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1087 "add file as regular failed") 1088 1089 def test_add_hardlink(self): 1090 tarinfo = self.tar.gettarinfo(self.bar) 1091 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1092 "add file as hardlink failed") 1093 1094 def test_dereference_hardlink(self): 1095 self.tar.dereference = True 1096 tarinfo = self.tar.gettarinfo(self.bar) 1097 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1098 "dereferencing hardlink failed") 1099 1100 1101class PaxWriteTest(GNUWriteTest): 1102 1103 def _test(self, name, link=None): 1104 # See GNUWriteTest. 1105 tarinfo = tarfile.TarInfo(name) 1106 if link: 1107 tarinfo.linkname = link 1108 tarinfo.type = tarfile.LNKTYPE 1109 1110 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1111 tar.addfile(tarinfo) 1112 tar.close() 1113 1114 tar = tarfile.open(tmpname) 1115 if link: 1116 l = tar.getmembers()[0].linkname 1117 self.assertTrue(link == l, "PAX longlink creation failed") 1118 else: 1119 n = tar.getmembers()[0].name 1120 self.assertTrue(name == n, "PAX longname creation failed") 1121 1122 def test_pax_global_header(self): 1123 pax_headers = { 1124 u"foo": u"bar", 1125 u"uid": u"0", 1126 u"mtime": u"1.23", 1127 u"test": u"���", 1128 u"���": u"test"} 1129 1130 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1131 pax_headers=pax_headers) 1132 tar.addfile(tarfile.TarInfo("test")) 1133 tar.close() 1134 1135 # Test if the global header was written correctly. 1136 tar = tarfile.open(tmpname, encoding="iso8859-1") 1137 self.assertEqual(tar.pax_headers, pax_headers) 1138 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1139 1140 # Test if all the fields are unicode. 1141 for key, val in tar.pax_headers.iteritems(): 1142 self.assertTrue(type(key) is unicode) 1143 self.assertTrue(type(val) is unicode) 1144 if key in tarfile.PAX_NUMBER_FIELDS: 1145 try: 1146 tarfile.PAX_NUMBER_FIELDS[key](val) 1147 except (TypeError, ValueError): 1148 self.fail("unable to convert pax header field") 1149 1150 def test_pax_extended_header(self): 1151 # The fields from the pax header have priority over the 1152 # TarInfo. 1153 pax_headers = {u"path": u"foo", u"uid": u"123"} 1154 1155 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1156 t = tarfile.TarInfo() 1157 t.name = u"���" # non-ASCII 1158 t.uid = 8**8 # too large 1159 t.pax_headers = pax_headers 1160 tar.addfile(t) 1161 tar.close() 1162 1163 tar = tarfile.open(tmpname, encoding="iso8859-1") 1164 t = tar.getmembers()[0] 1165 self.assertEqual(t.pax_headers, pax_headers) 1166 self.assertEqual(t.name, "foo") 1167 self.assertEqual(t.uid, 123) 1168 1169 1170class UstarUnicodeTest(unittest.TestCase): 1171 # All *UnicodeTests FIXME 1172 1173 format = tarfile.USTAR_FORMAT 1174 1175 def test_iso8859_1_filename(self): 1176 self._test_unicode_filename("iso8859-1") 1177 1178 def test_utf7_filename(self): 1179 self._test_unicode_filename("utf7") 1180 1181 def test_utf8_filename(self): 1182 self._test_unicode_filename("utf8") 1183 1184 def _test_unicode_filename(self, encoding): 1185 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1186 name = u"���" 1187 tar.addfile(tarfile.TarInfo(name)) 1188 tar.close() 1189 1190 tar = tarfile.open(tmpname, encoding=encoding) 1191 self.assertTrue(type(tar.getnames()[0]) is not unicode) 1192 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) 1193 tar.close() 1194 1195 def test_unicode_filename_error(self): 1196 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1197 tarinfo = tarfile.TarInfo() 1198 1199 tarinfo.name = "���" 1200 if self.format == tarfile.PAX_FORMAT: 1201 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1202 else: 1203 tar.addfile(tarinfo) 1204 1205 tarinfo.name = u"���" 1206 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1207 1208 tarinfo.name = "foo" 1209 tarinfo.uname = u"���" 1210 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1211 1212 def test_unicode_argument(self): 1213 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1214 for t in tar: 1215 self.assertTrue(type(t.name) is str) 1216 self.assertTrue(type(t.linkname) is str) 1217 self.assertTrue(type(t.uname) is str) 1218 self.assertTrue(type(t.gname) is str) 1219 tar.close() 1220 1221 def test_uname_unicode(self): 1222 for name in (u"���", "���"): 1223 t = tarfile.TarInfo("foo") 1224 t.uname = name 1225 t.gname = name 1226 1227 fobj = StringIO.StringIO() 1228 tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") 1229 tar.addfile(t) 1230 tar.close() 1231 fobj.seek(0) 1232 1233 tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") 1234 t = tar.getmember("foo") 1235 self.assertEqual(t.uname, "���") 1236 self.assertEqual(t.gname, "���") 1237 1238 1239class GNUUnicodeTest(UstarUnicodeTest): 1240 1241 format = tarfile.GNU_FORMAT 1242 1243 1244class PaxUnicodeTest(UstarUnicodeTest): 1245 1246 format = tarfile.PAX_FORMAT 1247 1248 def _create_unicode_name(self, name): 1249 tar = tarfile.open(tmpname, "w", format=self.format) 1250 t = tarfile.TarInfo() 1251 t.pax_headers["path"] = name 1252 tar.addfile(t) 1253 tar.close() 1254 1255 def test_error_handlers(self): 1256 # Test if the unicode error handlers work correctly for characters 1257 # that cannot be expressed in a given encoding. 1258 self._create_unicode_name(u"���") 1259 1260 for handler, name in (("utf-8", u"���".encode("utf8")), 1261 ("replace", "???"), ("ignore", "")): 1262 tar = tarfile.open(tmpname, format=self.format, encoding="ascii", 1263 errors=handler) 1264 self.assertEqual(tar.getnames()[0], name) 1265 1266 self.assertRaises(UnicodeError, tarfile.open, tmpname, 1267 encoding="ascii", errors="strict") 1268 1269 def test_error_handler_utf8(self): 1270 # Create a pathname that has one component representable using 1271 # iso8859-1 and the other only in iso8859-15. 1272 self._create_unicode_name(u"���/�") 1273 1274 tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1", 1275 errors="utf-8") 1276 self.assertEqual(tar.getnames()[0], "���/" + u"�".encode("utf8")) 1277 1278 1279class AppendTest(unittest.TestCase): 1280 # Test append mode (cp. patch #1652681). 1281 1282 def setUp(self): 1283 self.tarname = tmpname 1284 if os.path.exists(self.tarname): 1285 os.remove(self.tarname) 1286 1287 def _add_testfile(self, fileobj=None): 1288 tar = tarfile.open(self.tarname, "a", fileobj=fileobj) 1289 tar.addfile(tarfile.TarInfo("bar")) 1290 tar.close() 1291 1292 def _create_testtar(self, mode="w:"): 1293 src = tarfile.open(tarname, encoding="iso8859-1") 1294 t = src.getmember("ustar/regtype") 1295 t.name = "foo" 1296 f = src.extractfile(t) 1297 tar = tarfile.open(self.tarname, mode) 1298 tar.addfile(t, f) 1299 tar.close() 1300 1301 def _test(self, names=["bar"], fileobj=None): 1302 tar = tarfile.open(self.tarname, fileobj=fileobj) 1303 self.assertEqual(tar.getnames(), names) 1304 1305 def test_non_existing(self): 1306 self._add_testfile() 1307 self._test() 1308 1309 def test_empty(self): 1310 tarfile.open(self.tarname, "w:").close() 1311 self._add_testfile() 1312 self._test() 1313 1314 def test_empty_fileobj(self): 1315 fobj = StringIO.StringIO("\0" * 1024) 1316 self._add_testfile(fobj) 1317 fobj.seek(0) 1318 self._test(fileobj=fobj) 1319 1320 def test_fileobj(self): 1321 self._create_testtar() 1322 data = open(self.tarname).read() 1323 fobj = StringIO.StringIO(data) 1324 self._add_testfile(fobj) 1325 fobj.seek(0) 1326 self._test(names=["foo", "bar"], fileobj=fobj) 1327 1328 def test_existing(self): 1329 self._create_testtar() 1330 self._add_testfile() 1331 self._test(names=["foo", "bar"]) 1332 1333 def test_append_gz(self): 1334 if gzip is None: 1335 return 1336 self._create_testtar("w:gz") 1337 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1338 1339 def test_append_bz2(self): 1340 if bz2 is None: 1341 return 1342 self._create_testtar("w:bz2") 1343 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1344 1345 # Append mode is supposed to fail if the tarfile to append to 1346 # does not end with a zero block. 1347 def _test_error(self, data): 1348 open(self.tarname, "wb").write(data) 1349 self.assertRaises(tarfile.ReadError, self._add_testfile) 1350 1351 def test_null(self): 1352 self._test_error("") 1353 1354 def test_incomplete(self): 1355 self._test_error("\0" * 13) 1356 1357 def test_premature_eof(self): 1358 data = tarfile.TarInfo("foo").tobuf() 1359 self._test_error(data) 1360 1361 def test_trailing_garbage(self): 1362 data = tarfile.TarInfo("foo").tobuf() 1363 self._test_error(data + "\0" * 13) 1364 1365 def test_invalid(self): 1366 self._test_error("a" * 512) 1367 1368 1369class LimitsTest(unittest.TestCase): 1370 1371 def test_ustar_limits(self): 1372 # 100 char name 1373 tarinfo = tarfile.TarInfo("0123456789" * 10) 1374 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1375 1376 # 101 char name that cannot be stored 1377 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1378 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1379 1380 # 256 char name with a slash at pos 156 1381 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1382 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1383 1384 # 256 char name that cannot be stored 1385 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1386 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1387 1388 # 512 char name 1389 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1390 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1391 1392 # 512 char linkname 1393 tarinfo = tarfile.TarInfo("longlink") 1394 tarinfo.linkname = "123/" * 126 + "longname" 1395 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1396 1397 # uid > 8 digits 1398 tarinfo = tarfile.TarInfo("name") 1399 tarinfo.uid = 010000000 1400 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1401 1402 def test_gnu_limits(self): 1403 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1404 tarinfo.tobuf(tarfile.GNU_FORMAT) 1405 1406 tarinfo = tarfile.TarInfo("longlink") 1407 tarinfo.linkname = "123/" * 126 + "longname" 1408 tarinfo.tobuf(tarfile.GNU_FORMAT) 1409 1410 # uid >= 256 ** 7 1411 tarinfo = tarfile.TarInfo("name") 1412 tarinfo.uid = 04000000000000000000L 1413 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1414 1415 def test_pax_limits(self): 1416 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1417 tarinfo.tobuf(tarfile.PAX_FORMAT) 1418 1419 tarinfo = tarfile.TarInfo("longlink") 1420 tarinfo.linkname = "123/" * 126 + "longname" 1421 tarinfo.tobuf(tarfile.PAX_FORMAT) 1422 1423 tarinfo = tarfile.TarInfo("name") 1424 tarinfo.uid = 04000000000000000000L 1425 tarinfo.tobuf(tarfile.PAX_FORMAT) 1426 1427 1428class ContextManagerTest(unittest.TestCase): 1429 1430 def test_basic(self): 1431 with tarfile.open(tarname) as tar: 1432 self.assertFalse(tar.closed, "closed inside runtime context") 1433 self.assertTrue(tar.closed, "context manager failed") 1434 1435 def test_closed(self): 1436 # The __enter__() method is supposed to raise IOError 1437 # if the TarFile object is already closed. 1438 tar = tarfile.open(tarname) 1439 tar.close() 1440 with self.assertRaises(IOError): 1441 with tar: 1442 pass 1443 1444 def test_exception(self): 1445 # Test if the IOError exception is passed through properly. 1446 with self.assertRaises(Exception) as exc: 1447 with tarfile.open(tarname) as tar: 1448 raise IOError 1449 self.assertIsInstance(exc.exception, IOError, 1450 "wrong exception raised in context manager") 1451 self.assertTrue(tar.closed, "context manager failed") 1452 1453 def test_no_eof(self): 1454 # __exit__() must not write end-of-archive blocks if an 1455 # exception was raised. 1456 try: 1457 with tarfile.open(tmpname, "w") as tar: 1458 raise Exception 1459 except: 1460 pass 1461 self.assertEqual(os.path.getsize(tmpname), 0, 1462 "context manager wrote an end-of-archive block") 1463 self.assertTrue(tar.closed, "context manager failed") 1464 1465 def test_eof(self): 1466 # __exit__() must write end-of-archive blocks, i.e. call 1467 # TarFile.close() if there was no error. 1468 with tarfile.open(tmpname, "w"): 1469 pass 1470 self.assertNotEqual(os.path.getsize(tmpname), 0, 1471 "context manager wrote no end-of-archive block") 1472 1473 def test_fileobj(self): 1474 # Test that __exit__() did not close the external file 1475 # object. 1476 fobj = open(tmpname, "wb") 1477 try: 1478 with tarfile.open(fileobj=fobj, mode="w") as tar: 1479 raise Exception 1480 except: 1481 pass 1482 self.assertFalse(fobj.closed, "external file object was closed") 1483 self.assertTrue(tar.closed, "context manager failed") 1484 fobj.close() 1485 1486 1487class LinkEmulationTest(ReadTest): 1488 1489 # Test for issue #8741 regression. On platforms that do not support 1490 # symbolic or hard links tarfile tries to extract these types of members as 1491 # the regular files they point to. 1492 def _test_link_extraction(self, name): 1493 self.tar.extract(name, TEMPDIR) 1494 data = open(os.path.join(TEMPDIR, name), "rb").read() 1495 self.assertEqual(md5sum(data), md5_regtype) 1496 1497 def test_hardlink_extraction1(self): 1498 self._test_link_extraction("ustar/lnktype") 1499 1500 def test_hardlink_extraction2(self): 1501 self._test_link_extraction("./ustar/linktest2/lnktype") 1502 1503 def test_symlink_extraction1(self): 1504 self._test_link_extraction("ustar/symtype") 1505 1506 def test_symlink_extraction2(self): 1507 self._test_link_extraction("./ustar/linktest2/symtype") 1508 1509 1510class GzipMiscReadTest(MiscReadTest): 1511 tarname = gzipname 1512 mode = "r:gz" 1513class GzipUstarReadTest(UstarReadTest): 1514 tarname = gzipname 1515 mode = "r:gz" 1516class GzipStreamReadTest(StreamReadTest): 1517 tarname = gzipname 1518 mode = "r|gz" 1519class GzipWriteTest(WriteTest): 1520 mode = "w:gz" 1521class GzipStreamWriteTest(StreamWriteTest): 1522 mode = "w|gz" 1523 1524 1525class Bz2MiscReadTest(MiscReadTest): 1526 tarname = bz2name 1527 mode = "r:bz2" 1528class Bz2UstarReadTest(UstarReadTest): 1529 tarname = bz2name 1530 mode = "r:bz2" 1531class Bz2StreamReadTest(StreamReadTest): 1532 tarname = bz2name 1533 mode = "r|bz2" 1534class Bz2WriteTest(WriteTest): 1535 mode = "w:bz2" 1536class Bz2StreamWriteTest(StreamWriteTest): 1537 mode = "w|bz2" 1538 1539class Bz2PartialReadTest(unittest.TestCase): 1540 # Issue5068: The _BZ2Proxy.read() method loops forever 1541 # on an empty or partial bzipped file. 1542 1543 def _test_partial_input(self, mode): 1544 class MyStringIO(StringIO.StringIO): 1545 hit_eof = False 1546 def read(self, n): 1547 if self.hit_eof: 1548 raise AssertionError("infinite loop detected in tarfile.open()") 1549 self.hit_eof = self.pos == self.len 1550 return StringIO.StringIO.read(self, n) 1551 def seek(self, *args): 1552 self.hit_eof = False 1553 return StringIO.StringIO.seek(self, *args) 1554 1555 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1556 for x in range(len(data) + 1): 1557 try: 1558 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) 1559 except tarfile.ReadError: 1560 pass # we have no interest in ReadErrors 1561 1562 def test_partial_input(self): 1563 self._test_partial_input("r") 1564 1565 def test_partial_input_bz2(self): 1566 self._test_partial_input("r:bz2") 1567 1568 1569def test_main(): 1570 os.makedirs(TEMPDIR) 1571 1572 tests = [ 1573 UstarReadTest, 1574 MiscReadTest, 1575 StreamReadTest, 1576 DetectReadTest, 1577 MemberReadTest, 1578 GNUReadTest, 1579 PaxReadTest, 1580 WriteTest, 1581 StreamWriteTest, 1582 GNUWriteTest, 1583 PaxWriteTest, 1584 UstarUnicodeTest, 1585 GNUUnicodeTest, 1586 PaxUnicodeTest, 1587 AppendTest, 1588 LimitsTest, 1589 ContextManagerTest, 1590 ] 1591 1592 if hasattr(os, "link"): 1593 tests.append(HardlinkTest) 1594 else: 1595 tests.append(LinkEmulationTest) 1596 1597 fobj = open(tarname, "rb") 1598 data = fobj.read() 1599 fobj.close() 1600 1601 if gzip: 1602 # Create testtar.tar.gz and add gzip-specific tests. 1603 tar = gzip.open(gzipname, "wb") 1604 tar.write(data) 1605 tar.close() 1606 1607 tests += [ 1608 GzipMiscReadTest, 1609 GzipUstarReadTest, 1610 GzipStreamReadTest, 1611 GzipWriteTest, 1612 GzipStreamWriteTest, 1613 ] 1614 1615 if bz2: 1616 # Create testtar.tar.bz2 and add bz2-specific tests. 1617 tar = bz2.BZ2File(bz2name, "wb") 1618 tar.write(data) 1619 tar.close() 1620 1621 tests += [ 1622 Bz2MiscReadTest, 1623 Bz2UstarReadTest, 1624 Bz2StreamReadTest, 1625 Bz2WriteTest, 1626 Bz2StreamWriteTest, 1627 Bz2PartialReadTest, 1628 ] 1629 1630 try: 1631 test_support.run_unittest(*tests) 1632 finally: 1633 if os.path.exists(TEMPDIR): 1634 shutil.rmtree(TEMPDIR) 1635 1636if __name__ == "__main__": 1637 test_main() 1638