1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35try: 36 from pwd import getpwnam 37except ImportError: 38 getpwnam = None 39 40try: 41 from grp import getgrnam 42except ImportError: 43 getgrnam = None 44 45__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 46 "copytree", "move", "rmtree", "Error", "SpecialFileError", 47 "ExecError", "make_archive", "get_archive_formats", 48 "register_archive_format", "unregister_archive_format", 49 "get_unpack_formats", "register_unpack_format", 50 "unregister_unpack_format", "unpack_archive", 51 "ignore_patterns", "chown", "which", "get_terminal_size", 52 "SameFileError"] 53 # disk_usage is added later, if available on the platform 54 55class Error(OSError): 56 pass 57 58class SameFileError(Error): 59 """Raised when source and destination are the same file.""" 60 61class SpecialFileError(OSError): 62 """Raised when trying to do a kind of operation (e.g. copying) which is 63 not supported on a special file (e.g. a named pipe)""" 64 65class ExecError(OSError): 66 """Raised when a command could not be executed""" 67 68class ReadError(OSError): 69 """Raised when an archive cannot be read""" 70 71class RegistryError(Exception): 72 """Raised when a registry operation with the archiving 73 and unpacking registries fails""" 74 75 76def copyfileobj(fsrc, fdst, length=16*1024): 77 """copy data from file-like object fsrc to file-like object fdst""" 78 while 1: 79 buf = fsrc.read(length) 80 if not buf: 81 break 82 fdst.write(buf) 83 84def _samefile(src, dst): 85 # Macintosh, Unix. 86 if hasattr(os.path, 'samefile'): 87 try: 88 return os.path.samefile(src, dst) 89 except OSError: 90 return False 91 92 # All other platforms: check for same pathname. 93 return (os.path.normcase(os.path.abspath(src)) == 94 os.path.normcase(os.path.abspath(dst))) 95 96def copyfile(src, dst, *, follow_symlinks=True): 97 """Copy data from src to dst. 98 99 If follow_symlinks is not set and src is a symbolic link, a new 100 symlink will be created instead of copying the file it points to. 101 102 """ 103 if _samefile(src, dst): 104 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 105 106 for fn in [src, dst]: 107 try: 108 st = os.stat(fn) 109 except OSError: 110 # File most likely does not exist 111 pass 112 else: 113 # XXX What about other special files? (sockets, devices...) 114 if stat.S_ISFIFO(st.st_mode): 115 raise SpecialFileError("`%s` is a named pipe" % fn) 116 117 if not follow_symlinks and os.path.islink(src): 118 os.symlink(os.readlink(src), dst) 119 else: 120 with open(src, 'rb') as fsrc: 121 with open(dst, 'wb') as fdst: 122 copyfileobj(fsrc, fdst) 123 return dst 124 125def copymode(src, dst, *, follow_symlinks=True): 126 """Copy mode bits from src to dst. 127 128 If follow_symlinks is not set, symlinks aren't followed if and only 129 if both `src` and `dst` are symlinks. If `lchmod` isn't available 130 (e.g. Linux) this method does nothing. 131 132 """ 133 if not follow_symlinks and os.path.islink(src) and os.path.islink(dst): 134 if hasattr(os, 'lchmod'): 135 stat_func, chmod_func = os.lstat, os.lchmod 136 else: 137 return 138 elif hasattr(os, 'chmod'): 139 stat_func, chmod_func = os.stat, os.chmod 140 else: 141 return 142 143 st = stat_func(src) 144 chmod_func(dst, stat.S_IMODE(st.st_mode)) 145 146if hasattr(os, 'listxattr'): 147 def _copyxattr(src, dst, *, follow_symlinks=True): 148 """Copy extended filesystem attributes from `src` to `dst`. 149 150 Overwrite existing attributes. 151 152 If `follow_symlinks` is false, symlinks won't be followed. 153 154 """ 155 156 try: 157 names = os.listxattr(src, follow_symlinks=follow_symlinks) 158 except OSError as e: 159 if e.errno not in (errno.ENOTSUP, errno.ENODATA): 160 raise 161 return 162 for name in names: 163 try: 164 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 165 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 166 except OSError as e: 167 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA): 168 raise 169else: 170 def _copyxattr(*args, **kwargs): 171 pass 172 173def copystat(src, dst, *, follow_symlinks=True): 174 """Copy file metadata 175 176 Copy the permission bits, last access time, last modification time, and 177 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 178 attributes" where possible. The file contents, owner, and group are 179 unaffected. `src` and `dst` are path names given as strings. 180 181 If the optional flag `follow_symlinks` is not set, symlinks aren't 182 followed if and only if both `src` and `dst` are symlinks. 183 """ 184 def _nop(*args, ns=None, follow_symlinks=None): 185 pass 186 187 # follow symlinks (aka don't not follow symlinks) 188 follow = follow_symlinks or not (os.path.islink(src) and os.path.islink(dst)) 189 if follow: 190 # use the real function if it exists 191 def lookup(name): 192 return getattr(os, name, _nop) 193 else: 194 # use the real function only if it exists 195 # *and* it supports follow_symlinks 196 def lookup(name): 197 fn = getattr(os, name, _nop) 198 if fn in os.supports_follow_symlinks: 199 return fn 200 return _nop 201 202 st = lookup("stat")(src, follow_symlinks=follow) 203 mode = stat.S_IMODE(st.st_mode) 204 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 205 follow_symlinks=follow) 206 try: 207 lookup("chmod")(dst, mode, follow_symlinks=follow) 208 except NotImplementedError: 209 # if we got a NotImplementedError, it's because 210 # * follow_symlinks=False, 211 # * lchown() is unavailable, and 212 # * either 213 # * fchownat() is unavailable or 214 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 215 # (it returned ENOSUP.) 216 # therefore we're out of options--we simply cannot chown the 217 # symlink. give up, suppress the error. 218 # (which is what shutil always did in this circumstance.) 219 pass 220 if hasattr(st, 'st_flags'): 221 try: 222 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 223 except OSError as why: 224 for err in 'EOPNOTSUPP', 'ENOTSUP': 225 if hasattr(errno, err) and why.errno == getattr(errno, err): 226 break 227 else: 228 raise 229 _copyxattr(src, dst, follow_symlinks=follow) 230 231def copy(src, dst, *, follow_symlinks=True): 232 """Copy data and mode bits ("cp src dst"). Return the file's destination. 233 234 The destination may be a directory. 235 236 If follow_symlinks is false, symlinks won't be followed. This 237 resembles GNU's "cp -P src dst". 238 239 If source and destination are the same file, a SameFileError will be 240 raised. 241 242 """ 243 if os.path.isdir(dst): 244 dst = os.path.join(dst, os.path.basename(src)) 245 copyfile(src, dst, follow_symlinks=follow_symlinks) 246 copymode(src, dst, follow_symlinks=follow_symlinks) 247 return dst 248 249def copy2(src, dst, *, follow_symlinks=True): 250 """Copy data and metadata. Return the file's destination. 251 252 Metadata is copied with copystat(). Please see the copystat function 253 for more information. 254 255 The destination may be a directory. 256 257 If follow_symlinks is false, symlinks won't be followed. This 258 resembles GNU's "cp -P src dst". 259 260 """ 261 if os.path.isdir(dst): 262 dst = os.path.join(dst, os.path.basename(src)) 263 copyfile(src, dst, follow_symlinks=follow_symlinks) 264 copystat(src, dst, follow_symlinks=follow_symlinks) 265 return dst 266 267def ignore_patterns(*patterns): 268 """Function that can be used as copytree() ignore parameter. 269 270 Patterns is a sequence of glob-style patterns 271 that are used to exclude files""" 272 def _ignore_patterns(path, names): 273 ignored_names = [] 274 for pattern in patterns: 275 ignored_names.extend(fnmatch.filter(names, pattern)) 276 return set(ignored_names) 277 return _ignore_patterns 278 279def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 280 ignore_dangling_symlinks=False): 281 """Recursively copy a directory tree. 282 283 The destination directory must not already exist. 284 If exception(s) occur, an Error is raised with a list of reasons. 285 286 If the optional symlinks flag is true, symbolic links in the 287 source tree result in symbolic links in the destination tree; if 288 it is false, the contents of the files pointed to by symbolic 289 links are copied. If the file pointed by the symlink doesn't 290 exist, an exception will be added in the list of errors raised in 291 an Error exception at the end of the copy process. 292 293 You can set the optional ignore_dangling_symlinks flag to true if you 294 want to silence this exception. Notice that this has no effect on 295 platforms that don't support os.symlink. 296 297 The optional ignore argument is a callable. If given, it 298 is called with the `src` parameter, which is the directory 299 being visited by copytree(), and `names` which is the list of 300 `src` contents, as returned by os.listdir(): 301 302 callable(src, names) -> ignored_names 303 304 Since copytree() is called recursively, the callable will be 305 called once for each directory that is copied. It returns a 306 list of names relative to the `src` directory that should 307 not be copied. 308 309 The optional copy_function argument is a callable that will be used 310 to copy each file. It will be called with the source path and the 311 destination path as arguments. By default, copy2() is used, but any 312 function that supports the same signature (like copy()) can be used. 313 314 """ 315 names = os.listdir(src) 316 if ignore is not None: 317 ignored_names = ignore(src, names) 318 else: 319 ignored_names = set() 320 321 os.makedirs(dst) 322 errors = [] 323 for name in names: 324 if name in ignored_names: 325 continue 326 srcname = os.path.join(src, name) 327 dstname = os.path.join(dst, name) 328 try: 329 if os.path.islink(srcname): 330 linkto = os.readlink(srcname) 331 if symlinks: 332 # We can't just leave it to `copy_function` because legacy 333 # code with a custom `copy_function` may rely on copytree 334 # doing the right thing. 335 os.symlink(linkto, dstname) 336 copystat(srcname, dstname, follow_symlinks=not symlinks) 337 else: 338 # ignore dangling symlink if the flag is on 339 if not os.path.exists(linkto) and ignore_dangling_symlinks: 340 continue 341 # otherwise let the copy occurs. copy2 will raise an error 342 if os.path.isdir(srcname): 343 copytree(srcname, dstname, symlinks, ignore, 344 copy_function) 345 else: 346 copy_function(srcname, dstname) 347 elif os.path.isdir(srcname): 348 copytree(srcname, dstname, symlinks, ignore, copy_function) 349 else: 350 # Will raise a SpecialFileError for unsupported file types 351 copy_function(srcname, dstname) 352 # catch the Error from the recursive copytree so that we can 353 # continue with other files 354 except Error as err: 355 errors.extend(err.args[0]) 356 except OSError as why: 357 errors.append((srcname, dstname, str(why))) 358 try: 359 copystat(src, dst) 360 except OSError as why: 361 # Copying file access times may fail on Windows 362 if getattr(why, 'winerror', None) is None: 363 errors.append((src, dst, str(why))) 364 if errors: 365 raise Error(errors) 366 return dst 367 368# version vulnerable to race conditions 369def _rmtree_unsafe(path, onerror): 370 try: 371 with os.scandir(path) as scandir_it: 372 entries = list(scandir_it) 373 except OSError: 374 onerror(os.scandir, path, sys.exc_info()) 375 entries = [] 376 for entry in entries: 377 fullname = entry.path 378 try: 379 is_dir = entry.is_dir(follow_symlinks=False) 380 except OSError: 381 is_dir = False 382 if is_dir: 383 try: 384 if entry.is_symlink(): 385 # This can only happen if someone replaces 386 # a directory with a symlink after the call to 387 # os.scandir or entry.is_dir above. 388 raise OSError("Cannot call rmtree on a symbolic link") 389 except OSError: 390 onerror(os.path.islink, fullname, sys.exc_info()) 391 continue 392 _rmtree_unsafe(fullname, onerror) 393 else: 394 try: 395 os.unlink(fullname) 396 except OSError: 397 onerror(os.unlink, fullname, sys.exc_info()) 398 try: 399 os.rmdir(path) 400 except OSError: 401 onerror(os.rmdir, path, sys.exc_info()) 402 403# Version using fd-based APIs to protect against races 404def _rmtree_safe_fd(topfd, path, onerror): 405 try: 406 with os.scandir(topfd) as scandir_it: 407 entries = list(scandir_it) 408 except OSError as err: 409 err.filename = path 410 onerror(os.scandir, path, sys.exc_info()) 411 return 412 for entry in entries: 413 fullname = os.path.join(path, entry.name) 414 try: 415 is_dir = entry.is_dir(follow_symlinks=False) 416 if is_dir: 417 orig_st = entry.stat(follow_symlinks=False) 418 is_dir = stat.S_ISDIR(orig_st.st_mode) 419 except OSError: 420 is_dir = False 421 if is_dir: 422 try: 423 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 424 except OSError: 425 onerror(os.open, fullname, sys.exc_info()) 426 else: 427 try: 428 if os.path.samestat(orig_st, os.fstat(dirfd)): 429 _rmtree_safe_fd(dirfd, fullname, onerror) 430 try: 431 os.rmdir(entry.name, dir_fd=topfd) 432 except OSError: 433 onerror(os.rmdir, fullname, sys.exc_info()) 434 else: 435 try: 436 # This can only happen if someone replaces 437 # a directory with a symlink after the call to 438 # os.scandir or stat.S_ISDIR above. 439 raise OSError("Cannot call rmtree on a symbolic " 440 "link") 441 except OSError: 442 onerror(os.path.islink, fullname, sys.exc_info()) 443 finally: 444 os.close(dirfd) 445 else: 446 try: 447 os.unlink(entry.name, dir_fd=topfd) 448 except OSError: 449 onerror(os.unlink, fullname, sys.exc_info()) 450 451_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 452 os.supports_dir_fd and 453 os.scandir in os.supports_fd and 454 os.stat in os.supports_follow_symlinks) 455 456def rmtree(path, ignore_errors=False, onerror=None): 457 """Recursively delete a directory tree. 458 459 If ignore_errors is set, errors are ignored; otherwise, if onerror 460 is set, it is called to handle the error with arguments (func, 461 path, exc_info) where func is platform and implementation dependent; 462 path is the argument to that function that caused it to fail; and 463 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 464 is false and onerror is None, an exception is raised. 465 466 """ 467 if ignore_errors: 468 def onerror(*args): 469 pass 470 elif onerror is None: 471 def onerror(*args): 472 raise 473 if _use_fd_functions: 474 # While the unsafe rmtree works fine on bytes, the fd based does not. 475 if isinstance(path, bytes): 476 path = os.fsdecode(path) 477 # Note: To guard against symlink races, we use the standard 478 # lstat()/open()/fstat() trick. 479 try: 480 orig_st = os.lstat(path) 481 except Exception: 482 onerror(os.lstat, path, sys.exc_info()) 483 return 484 try: 485 fd = os.open(path, os.O_RDONLY) 486 except Exception: 487 onerror(os.lstat, path, sys.exc_info()) 488 return 489 try: 490 if os.path.samestat(orig_st, os.fstat(fd)): 491 _rmtree_safe_fd(fd, path, onerror) 492 try: 493 os.rmdir(path) 494 except OSError: 495 onerror(os.rmdir, path, sys.exc_info()) 496 else: 497 try: 498 # symlinks to directories are forbidden, see bug #1669 499 raise OSError("Cannot call rmtree on a symbolic link") 500 except OSError: 501 onerror(os.path.islink, path, sys.exc_info()) 502 finally: 503 os.close(fd) 504 else: 505 try: 506 if os.path.islink(path): 507 # symlinks to directories are forbidden, see bug #1669 508 raise OSError("Cannot call rmtree on a symbolic link") 509 except OSError: 510 onerror(os.path.islink, path, sys.exc_info()) 511 # can't continue even if onerror hook returns 512 return 513 return _rmtree_unsafe(path, onerror) 514 515# Allow introspection of whether or not the hardening against symlink 516# attacks is supported on the current platform 517rmtree.avoids_symlink_attacks = _use_fd_functions 518 519def _basename(path): 520 # A basename() variant which first strips the trailing slash, if present. 521 # Thus we always get the last component of the path, even for directories. 522 sep = os.path.sep + (os.path.altsep or '') 523 return os.path.basename(path.rstrip(sep)) 524 525def move(src, dst, copy_function=copy2): 526 """Recursively move a file or directory to another location. This is 527 similar to the Unix "mv" command. Return the file or directory's 528 destination. 529 530 If the destination is a directory or a symlink to a directory, the source 531 is moved inside the directory. The destination path must not already 532 exist. 533 534 If the destination already exists but is not a directory, it may be 535 overwritten depending on os.rename() semantics. 536 537 If the destination is on our current filesystem, then rename() is used. 538 Otherwise, src is copied to the destination and then removed. Symlinks are 539 recreated under the new name if os.rename() fails because of cross 540 filesystem renames. 541 542 The optional `copy_function` argument is a callable that will be used 543 to copy the source or it will be delegated to `copytree`. 544 By default, copy2() is used, but any function that supports the same 545 signature (like copy()) can be used. 546 547 A lot more could be done here... A look at a mv.c shows a lot of 548 the issues this implementation glosses over. 549 550 """ 551 real_dst = dst 552 if os.path.isdir(dst): 553 if _samefile(src, dst): 554 # We might be on a case insensitive filesystem, 555 # perform the rename anyway. 556 os.rename(src, dst) 557 return 558 559 real_dst = os.path.join(dst, _basename(src)) 560 if os.path.exists(real_dst): 561 raise Error("Destination path '%s' already exists" % real_dst) 562 try: 563 os.rename(src, real_dst) 564 except OSError: 565 if os.path.islink(src): 566 linkto = os.readlink(src) 567 os.symlink(linkto, real_dst) 568 os.unlink(src) 569 elif os.path.isdir(src): 570 if _destinsrc(src, dst): 571 raise Error("Cannot move a directory '%s' into itself" 572 " '%s'." % (src, dst)) 573 copytree(src, real_dst, copy_function=copy_function, 574 symlinks=True) 575 rmtree(src) 576 else: 577 copy_function(src, real_dst) 578 os.unlink(src) 579 return real_dst 580 581def _destinsrc(src, dst): 582 src = os.path.abspath(src) 583 dst = os.path.abspath(dst) 584 if not src.endswith(os.path.sep): 585 src += os.path.sep 586 if not dst.endswith(os.path.sep): 587 dst += os.path.sep 588 return dst.startswith(src) 589 590def _get_gid(name): 591 """Returns a gid, given a group name.""" 592 if getgrnam is None or name is None: 593 return None 594 try: 595 result = getgrnam(name) 596 except KeyError: 597 result = None 598 if result is not None: 599 return result[2] 600 return None 601 602def _get_uid(name): 603 """Returns an uid, given a user name.""" 604 if getpwnam is None or name is None: 605 return None 606 try: 607 result = getpwnam(name) 608 except KeyError: 609 result = None 610 if result is not None: 611 return result[2] 612 return None 613 614def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 615 owner=None, group=None, logger=None): 616 """Create a (possibly compressed) tar file from all the files under 617 'base_dir'. 618 619 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 620 621 'owner' and 'group' can be used to define an owner and a group for the 622 archive that is being built. If not provided, the current owner and group 623 will be used. 624 625 The output tar file will be named 'base_name' + ".tar", possibly plus 626 the appropriate compression extension (".gz", ".bz2", or ".xz"). 627 628 Returns the output filename. 629 """ 630 if compress is None: 631 tar_compression = '' 632 elif _ZLIB_SUPPORTED and compress == 'gzip': 633 tar_compression = 'gz' 634 elif _BZ2_SUPPORTED and compress == 'bzip2': 635 tar_compression = 'bz2' 636 elif _LZMA_SUPPORTED and compress == 'xz': 637 tar_compression = 'xz' 638 else: 639 raise ValueError("bad value for 'compress', or compression format not " 640 "supported : {0}".format(compress)) 641 642 import tarfile # late import for breaking circular dependency 643 644 compress_ext = '.' + tar_compression if compress else '' 645 archive_name = base_name + '.tar' + compress_ext 646 archive_dir = os.path.dirname(archive_name) 647 648 if archive_dir and not os.path.exists(archive_dir): 649 if logger is not None: 650 logger.info("creating %s", archive_dir) 651 if not dry_run: 652 os.makedirs(archive_dir) 653 654 # creating the tarball 655 if logger is not None: 656 logger.info('Creating tar archive') 657 658 uid = _get_uid(owner) 659 gid = _get_gid(group) 660 661 def _set_uid_gid(tarinfo): 662 if gid is not None: 663 tarinfo.gid = gid 664 tarinfo.gname = group 665 if uid is not None: 666 tarinfo.uid = uid 667 tarinfo.uname = owner 668 return tarinfo 669 670 if not dry_run: 671 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 672 try: 673 tar.add(base_dir, filter=_set_uid_gid) 674 finally: 675 tar.close() 676 677 return archive_name 678 679def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): 680 """Create a zip file from all the files under 'base_dir'. 681 682 The output zip file will be named 'base_name' + ".zip". Returns the 683 name of the output zip file. 684 """ 685 import zipfile # late import for breaking circular dependency 686 687 zip_filename = base_name + ".zip" 688 archive_dir = os.path.dirname(base_name) 689 690 if archive_dir and not os.path.exists(archive_dir): 691 if logger is not None: 692 logger.info("creating %s", archive_dir) 693 if not dry_run: 694 os.makedirs(archive_dir) 695 696 if logger is not None: 697 logger.info("creating '%s' and adding '%s' to it", 698 zip_filename, base_dir) 699 700 if not dry_run: 701 with zipfile.ZipFile(zip_filename, "w", 702 compression=zipfile.ZIP_DEFLATED) as zf: 703 path = os.path.normpath(base_dir) 704 if path != os.curdir: 705 zf.write(path, path) 706 if logger is not None: 707 logger.info("adding '%s'", path) 708 for dirpath, dirnames, filenames in os.walk(base_dir): 709 for name in sorted(dirnames): 710 path = os.path.normpath(os.path.join(dirpath, name)) 711 zf.write(path, path) 712 if logger is not None: 713 logger.info("adding '%s'", path) 714 for name in filenames: 715 path = os.path.normpath(os.path.join(dirpath, name)) 716 if os.path.isfile(path): 717 zf.write(path, path) 718 if logger is not None: 719 logger.info("adding '%s'", path) 720 721 return zip_filename 722 723_ARCHIVE_FORMATS = { 724 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 725} 726 727if _ZLIB_SUPPORTED: 728 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 729 "gzip'ed tar-file") 730 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") 731 732if _BZ2_SUPPORTED: 733 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 734 "bzip2'ed tar-file") 735 736if _LZMA_SUPPORTED: 737 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 738 "xz'ed tar-file") 739 740def get_archive_formats(): 741 """Returns a list of supported formats for archiving and unarchiving. 742 743 Each element of the returned sequence is a tuple (name, description) 744 """ 745 formats = [(name, registry[2]) for name, registry in 746 _ARCHIVE_FORMATS.items()] 747 formats.sort() 748 return formats 749 750def register_archive_format(name, function, extra_args=None, description=''): 751 """Registers an archive format. 752 753 name is the name of the format. function is the callable that will be 754 used to create archives. If provided, extra_args is a sequence of 755 (name, value) tuples that will be passed as arguments to the callable. 756 description can be provided to describe the format, and will be returned 757 by the get_archive_formats() function. 758 """ 759 if extra_args is None: 760 extra_args = [] 761 if not callable(function): 762 raise TypeError('The %s object is not callable' % function) 763 if not isinstance(extra_args, (tuple, list)): 764 raise TypeError('extra_args needs to be a sequence') 765 for element in extra_args: 766 if not isinstance(element, (tuple, list)) or len(element) !=2: 767 raise TypeError('extra_args elements are : (arg_name, value)') 768 769 _ARCHIVE_FORMATS[name] = (function, extra_args, description) 770 771def unregister_archive_format(name): 772 del _ARCHIVE_FORMATS[name] 773 774def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 775 dry_run=0, owner=None, group=None, logger=None): 776 """Create an archive file (eg. zip or tar). 777 778 'base_name' is the name of the file to create, minus any format-specific 779 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 780 "bztar", or "xztar". Or any other registered format. 781 782 'root_dir' is a directory that will be the root directory of the 783 archive; ie. we typically chdir into 'root_dir' before creating the 784 archive. 'base_dir' is the directory where we start archiving from; 785 ie. 'base_dir' will be the common prefix of all files and 786 directories in the archive. 'root_dir' and 'base_dir' both default 787 to the current directory. Returns the name of the archive file. 788 789 'owner' and 'group' are used when creating a tar archive. By default, 790 uses the current owner and group. 791 """ 792 save_cwd = os.getcwd() 793 if root_dir is not None: 794 if logger is not None: 795 logger.debug("changing into '%s'", root_dir) 796 base_name = os.path.abspath(base_name) 797 if not dry_run: 798 os.chdir(root_dir) 799 800 if base_dir is None: 801 base_dir = os.curdir 802 803 kwargs = {'dry_run': dry_run, 'logger': logger} 804 805 try: 806 format_info = _ARCHIVE_FORMATS[format] 807 except KeyError: 808 raise ValueError("unknown archive format '%s'" % format) from None 809 810 func = format_info[0] 811 for arg, val in format_info[1]: 812 kwargs[arg] = val 813 814 if format != 'zip': 815 kwargs['owner'] = owner 816 kwargs['group'] = group 817 818 try: 819 filename = func(base_name, base_dir, **kwargs) 820 finally: 821 if root_dir is not None: 822 if logger is not None: 823 logger.debug("changing back to '%s'", save_cwd) 824 os.chdir(save_cwd) 825 826 return filename 827 828 829def get_unpack_formats(): 830 """Returns a list of supported formats for unpacking. 831 832 Each element of the returned sequence is a tuple 833 (name, extensions, description) 834 """ 835 formats = [(name, info[0], info[3]) for name, info in 836 _UNPACK_FORMATS.items()] 837 formats.sort() 838 return formats 839 840def _check_unpack_options(extensions, function, extra_args): 841 """Checks what gets registered as an unpacker.""" 842 # first make sure no other unpacker is registered for this extension 843 existing_extensions = {} 844 for name, info in _UNPACK_FORMATS.items(): 845 for ext in info[0]: 846 existing_extensions[ext] = name 847 848 for extension in extensions: 849 if extension in existing_extensions: 850 msg = '%s is already registered for "%s"' 851 raise RegistryError(msg % (extension, 852 existing_extensions[extension])) 853 854 if not callable(function): 855 raise TypeError('The registered function must be a callable') 856 857 858def register_unpack_format(name, extensions, function, extra_args=None, 859 description=''): 860 """Registers an unpack format. 861 862 `name` is the name of the format. `extensions` is a list of extensions 863 corresponding to the format. 864 865 `function` is the callable that will be 866 used to unpack archives. The callable will receive archives to unpack. 867 If it's unable to handle an archive, it needs to raise a ReadError 868 exception. 869 870 If provided, `extra_args` is a sequence of 871 (name, value) tuples that will be passed as arguments to the callable. 872 description can be provided to describe the format, and will be returned 873 by the get_unpack_formats() function. 874 """ 875 if extra_args is None: 876 extra_args = [] 877 _check_unpack_options(extensions, function, extra_args) 878 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 879 880def unregister_unpack_format(name): 881 """Removes the pack format from the registry.""" 882 del _UNPACK_FORMATS[name] 883 884def _ensure_directory(path): 885 """Ensure that the parent directory of `path` exists""" 886 dirname = os.path.dirname(path) 887 if not os.path.isdir(dirname): 888 os.makedirs(dirname) 889 890def _unpack_zipfile(filename, extract_dir): 891 """Unpack zip `filename` to `extract_dir` 892 """ 893 import zipfile # late import for breaking circular dependency 894 895 if not zipfile.is_zipfile(filename): 896 raise ReadError("%s is not a zip file" % filename) 897 898 zip = zipfile.ZipFile(filename) 899 try: 900 for info in zip.infolist(): 901 name = info.filename 902 903 # don't extract absolute paths or ones with .. in them 904 if name.startswith('/') or '..' in name: 905 continue 906 907 target = os.path.join(extract_dir, *name.split('/')) 908 if not target: 909 continue 910 911 _ensure_directory(target) 912 if not name.endswith('/'): 913 # file 914 data = zip.read(info.filename) 915 f = open(target, 'wb') 916 try: 917 f.write(data) 918 finally: 919 f.close() 920 del data 921 finally: 922 zip.close() 923 924def _unpack_tarfile(filename, extract_dir): 925 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 926 """ 927 import tarfile # late import for breaking circular dependency 928 try: 929 tarobj = tarfile.open(filename) 930 except tarfile.TarError: 931 raise ReadError( 932 "%s is not a compressed or uncompressed tar file" % filename) 933 try: 934 tarobj.extractall(extract_dir) 935 finally: 936 tarobj.close() 937 938_UNPACK_FORMATS = { 939 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 940 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 941} 942 943if _ZLIB_SUPPORTED: 944 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 945 "gzip'ed tar-file") 946 947if _BZ2_SUPPORTED: 948 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 949 "bzip2'ed tar-file") 950 951if _LZMA_SUPPORTED: 952 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 953 "xz'ed tar-file") 954 955def _find_unpack_format(filename): 956 for name, info in _UNPACK_FORMATS.items(): 957 for extension in info[0]: 958 if filename.endswith(extension): 959 return name 960 return None 961 962def unpack_archive(filename, extract_dir=None, format=None): 963 """Unpack an archive. 964 965 `filename` is the name of the archive. 966 967 `extract_dir` is the name of the target directory, where the archive 968 is unpacked. If not provided, the current working directory is used. 969 970 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 971 or "xztar". Or any other registered format. If not provided, 972 unpack_archive will use the filename extension and see if an unpacker 973 was registered for that extension. 974 975 In case none is found, a ValueError is raised. 976 """ 977 if extract_dir is None: 978 extract_dir = os.getcwd() 979 980 extract_dir = os.fspath(extract_dir) 981 filename = os.fspath(filename) 982 983 if format is not None: 984 try: 985 format_info = _UNPACK_FORMATS[format] 986 except KeyError: 987 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 988 989 func = format_info[1] 990 func(filename, extract_dir, **dict(format_info[2])) 991 else: 992 # we need to look at the registered unpackers supported extensions 993 format = _find_unpack_format(filename) 994 if format is None: 995 raise ReadError("Unknown archive format '{0}'".format(filename)) 996 997 func = _UNPACK_FORMATS[format][1] 998 kwargs = dict(_UNPACK_FORMATS[format][2]) 999 func(filename, extract_dir, **kwargs) 1000 1001 1002if hasattr(os, 'statvfs'): 1003 1004 __all__.append('disk_usage') 1005 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1006 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1007 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1008 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1009 1010 def disk_usage(path): 1011 """Return disk usage statistics about the given path. 1012 1013 Returned value is a named tuple with attributes 'total', 'used' and 1014 'free', which are the amount of total, used and free space, in bytes. 1015 """ 1016 st = os.statvfs(path) 1017 free = st.f_bavail * st.f_frsize 1018 total = st.f_blocks * st.f_frsize 1019 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1020 return _ntuple_diskusage(total, used, free) 1021 1022elif os.name == 'nt': 1023 1024 import nt 1025 __all__.append('disk_usage') 1026 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1027 1028 def disk_usage(path): 1029 """Return disk usage statistics about the given path. 1030 1031 Returned values is a named tuple with attributes 'total', 'used' and 1032 'free', which are the amount of total, used and free space, in bytes. 1033 """ 1034 total, free = nt._getdiskusage(path) 1035 used = total - free 1036 return _ntuple_diskusage(total, used, free) 1037 1038 1039def chown(path, user=None, group=None): 1040 """Change owner user and group of the given path. 1041 1042 user and group can be the uid/gid or the user/group names, and in that case, 1043 they are converted to their respective uid/gid. 1044 """ 1045 1046 if user is None and group is None: 1047 raise ValueError("user and/or group must be set") 1048 1049 _user = user 1050 _group = group 1051 1052 # -1 means don't change it 1053 if user is None: 1054 _user = -1 1055 # user can either be an int (the uid) or a string (the system username) 1056 elif isinstance(user, str): 1057 _user = _get_uid(user) 1058 if _user is None: 1059 raise LookupError("no such user: {!r}".format(user)) 1060 1061 if group is None: 1062 _group = -1 1063 elif not isinstance(group, int): 1064 _group = _get_gid(group) 1065 if _group is None: 1066 raise LookupError("no such group: {!r}".format(group)) 1067 1068 os.chown(path, _user, _group) 1069 1070def get_terminal_size(fallback=(80, 24)): 1071 """Get the size of the terminal window. 1072 1073 For each of the two dimensions, the environment variable, COLUMNS 1074 and LINES respectively, is checked. If the variable is defined and 1075 the value is a positive integer, it is used. 1076 1077 When COLUMNS or LINES is not defined, which is the common case, 1078 the terminal connected to sys.__stdout__ is queried 1079 by invoking os.get_terminal_size. 1080 1081 If the terminal size cannot be successfully queried, either because 1082 the system doesn't support querying, or because we are not 1083 connected to a terminal, the value given in fallback parameter 1084 is used. Fallback defaults to (80, 24) which is the default 1085 size used by many terminal emulators. 1086 1087 The value returned is a named tuple of type os.terminal_size. 1088 """ 1089 # columns, lines are the working values 1090 try: 1091 columns = int(os.environ['COLUMNS']) 1092 except (KeyError, ValueError): 1093 columns = 0 1094 1095 try: 1096 lines = int(os.environ['LINES']) 1097 except (KeyError, ValueError): 1098 lines = 0 1099 1100 # only query if necessary 1101 if columns <= 0 or lines <= 0: 1102 try: 1103 size = os.get_terminal_size(sys.__stdout__.fileno()) 1104 except (AttributeError, ValueError, OSError): 1105 # stdout is None, closed, detached, or not a terminal, or 1106 # os.get_terminal_size() is unsupported 1107 size = os.terminal_size(fallback) 1108 if columns <= 0: 1109 columns = size.columns 1110 if lines <= 0: 1111 lines = size.lines 1112 1113 return os.terminal_size((columns, lines)) 1114 1115def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1116 """Given a command, mode, and a PATH string, return the path which 1117 conforms to the given mode on the PATH, or None if there is no such 1118 file. 1119 1120 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1121 of os.environ.get("PATH"), or can be overridden with a custom search 1122 path. 1123 1124 """ 1125 # Check that a given file can be accessed with the correct mode. 1126 # Additionally check that `file` is not a directory, as on Windows 1127 # directories pass the os.access check. 1128 def _access_check(fn, mode): 1129 return (os.path.exists(fn) and os.access(fn, mode) 1130 and not os.path.isdir(fn)) 1131 1132 # If we're given a path with a directory part, look it up directly rather 1133 # than referring to PATH directories. This includes checking relative to the 1134 # current directory, e.g. ./script 1135 if os.path.dirname(cmd): 1136 if _access_check(cmd, mode): 1137 return cmd 1138 return None 1139 1140 if path is None: 1141 path = os.environ.get("PATH", os.defpath) 1142 if not path: 1143 return None 1144 path = path.split(os.pathsep) 1145 1146 if sys.platform == "win32": 1147 # The current directory takes precedence on Windows. 1148 if not os.curdir in path: 1149 path.insert(0, os.curdir) 1150 1151 # PATHEXT is necessary to check on Windows. 1152 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 1153 # See if the given file matches any of the expected path extensions. 1154 # This will allow us to short circuit when given "python.exe". 1155 # If it does match, only test that one, otherwise we have to try 1156 # others. 1157 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 1158 files = [cmd] 1159 else: 1160 files = [cmd + ext for ext in pathext] 1161 else: 1162 # On other platforms you don't have things like PATHEXT to tell you 1163 # what file suffixes are executable, so just pass on cmd as-is. 1164 files = [cmd] 1165 1166 seen = set() 1167 for dir in path: 1168 normdir = os.path.normcase(dir) 1169 if not normdir in seen: 1170 seen.add(normdir) 1171 for thefile in files: 1172 name = os.path.join(dir, thefile) 1173 if _access_check(name, mode): 1174 return name 1175 return None 1176