1# -*- coding: utf-8 -*- 2# Copyright 2014 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Implementation of Unix-like rsync command.""" 16 17from __future__ import absolute_import 18 19import errno 20import heapq 21import io 22from itertools import islice 23import os 24import re 25import tempfile 26import textwrap 27import traceback 28import urllib 29 30from boto import config 31import crcmod 32 33from gslib import copy_helper 34from gslib.bucket_listing_ref import BucketListingObject 35from gslib.cloud_api import NotFoundException 36from gslib.command import Command 37from gslib.command import DummyArgChecker 38from gslib.command_argument import CommandArgument 39from gslib.copy_helper import CreateCopyHelperOpts 40from gslib.copy_helper import SkipUnsupportedObjectError 41from gslib.cs_api_map import ApiSelector 42from gslib.exception import CommandException 43from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents 44from gslib.hashing_helper import CalculateB64EncodedMd5FromContents 45from gslib.hashing_helper import SLOW_CRCMOD_WARNING 46from gslib.plurality_checkable_iterator import PluralityCheckableIterator 47from gslib.sig_handling import GetCaughtSignals 48from gslib.sig_handling import RegisterSignalHandler 49from gslib.storage_url import StorageUrlFromString 50from gslib.util import GetCloudApiInstance 51from gslib.util import IsCloudSubdirPlaceholder 52from gslib.util import TEN_MIB 53from gslib.util import UsingCrcmodExtension 54from gslib.util import UTF8 55from gslib.wildcard_iterator import CreateWildcardIterator 56 57 58_SYNOPSIS = """ 59 gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url 60""" 61 62_DETAILED_HELP_TEXT = (""" 63<B>SYNOPSIS</B> 64""" + _SYNOPSIS + """ 65 66 67<B>DESCRIPTION</B> 68 The gsutil rsync command makes the contents under dst_url the same as the 69 contents under src_url, by copying any missing files/objects, and (if the 70 -d option is specified) deleting any extra files/objects. For example, to 71 make gs://mybucket/data match the contents of the local directory "data" 72 you could do: 73 74 gsutil rsync -d data gs://mybucket/data 75 76 To recurse into directories use the -r option: 77 78 gsutil rsync -d -r data gs://mybucket/data 79 80 To copy only new/changed files without deleting extra files from 81 gs://mybucket/data leave off the -d option: 82 83 gsutil rsync -r data gs://mybucket/data 84 85 If you have a large number of objects to synchronize you might want to use the 86 gsutil -m option, to perform parallel (multi-threaded/multi-processing) 87 synchronization: 88 89 gsutil -m rsync -d -r data gs://mybucket/data 90 91 The -m option typically will provide a large performance boost if either the 92 source or destination (or both) is a cloud URL. If both source and 93 destination are file URLs the -m option will typically thrash the disk and 94 slow synchronization down. 95 96 To make the local directory "data" the same as the contents of 97 gs://mybucket/data: 98 99 gsutil rsync -d -r gs://mybucket/data data 100 101 To make the contents of gs://mybucket2 the same as gs://mybucket1: 102 103 gsutil rsync -d -r gs://mybucket1 gs://mybucket2 104 105 You can also mirror data across local directories: 106 107 gsutil rsync -d -r dir1 dir2 108 109 To mirror your content across clouds: 110 111 gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket 112 113 Note: If you are synchronizing a large amount of data between clouds you might 114 consider setting up a 115 `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_ 116 account and running gsutil there. Since cross-provider gsutil data transfers 117 flow through the machine where gsutil is running, doing this can make your 118 transfer run significantly faster than running gsutil on your local 119 workstation. 120 121 122<B>BE CAREFUL WHEN USING -d OPTION!</B> 123 The rsync -d option is very useful and commonly used, because it provides a 124 means of making the contents of a destination bucket or directory match those 125 of a source bucket or directory. However, please exercise caution when you 126 use this option: It's possible to delete large amounts of data accidentally 127 if, for example, you erroneously reverse source and destination. For example, 128 if you meant to synchronize a local directory from a bucket in the cloud but 129 instead run the command: 130 131 gsutil -m rsync -r -d ./your-dir gs://your-bucket 132 133 and your-dir is currently empty, you will quickly delete all of the objects in 134 gs://your-bucket. 135 136 You can also cause large amounts of data to be lost quickly by specifying a 137 subdirectory of the destination as the source of an rsync. For example, the 138 command: 139 140 gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket 141 142 would cause most or all of the objects in gs://your-bucket to be deleted 143 (some objects may survive if there are any with names that sort lower than 144 "data" under gs://your-bucket/data). 145 146 In addition to paying careful attention to the source and destination you 147 specify with the rsync command, there are two more safety measures your can 148 take when using gsutil rsync -d: 149 150 1. Try running the command with the rsync -n option first, to see what it 151 would do without actually performing the operations. For example, if 152 you run the command: 153 154 gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket 155 156 it will be immediately evident that running that command without the -n 157 option would cause many objects to be deleted. 158 159 2. Enable object versioning in your bucket, which will allow you to restore 160 objects if you accidentally delete them. For more details see 161 "gsutil help versions". 162 163 164<B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B> 165 The rsync command operates by listing the source and destination URLs, and 166 then performing copy and remove operations according to the differences 167 between these listings. Because bucket listing is eventually (not strongly) 168 consistent, if you upload new objects or delete objects from a bucket and then 169 immediately run gsutil rsync with that bucket as the source or destination, 170 it's possible the rsync command will not see the recent updates and thus 171 synchronize incorrectly. You can rerun the rsync operation again later to 172 correct the incorrect synchronization. 173 174 175<B>CHECKSUM VALIDATION AND FAILURE HANDLING</B> 176 At the end of every upload or download, the gsutil rsync command validates 177 that the checksum of the source file/object matches the checksum of the 178 destination file/object. If the checksums do not match, gsutil will delete 179 the invalid copy and print a warning message. This very rarely happens, but 180 if it does, please contact gs-team@google.com. 181 182 The rsync command will retry when failures occur, but if enough failures 183 happen during a particular copy or delete operation the command will skip that 184 object and move on. At the end of the synchronization run if any failures were 185 not successfully retried, the rsync command will report the count of failures, 186 and exit with non-zero status. At this point you can run the rsync command 187 again, and it will attempt any remaining needed copy and/or delete operations. 188 189 Note that there are cases where retrying will never succeed, such as if you 190 don't have write permission to the destination bucket or if the destination 191 path for some objects is longer than the maximum allowed length. 192 193 For more details about gsutil's retry handling, please see 194 "gsutil help retries". 195 196 197<B>CHANGE DETECTION ALGORITHM</B> 198 To determine if a file or object has changed gsutil rsync first checks whether 199 the source and destination sizes match. If they match, it next checks if their 200 checksums match, using checksums if available (see below). Unlike the Unix 201 rsync command, gsutil rsync does not use timestamps to determine if the 202 file/object changed, because the GCS API does not permit the caller to set an 203 object's timestamp (hence, timestamps of identical files/objects cannot be 204 made to match). 205 206 Checksums will not be available in two cases: 207 208 1. When synchronizing to or from a file system. By default, gsutil does not 209 checksum files, because of the slowdown caused when working with large 210 files. You can cause gsutil to checksum files by using the gsutil rsync -c 211 option, at the cost of increased local disk I/O and run time when working 212 with large files. You should consider using the -c option if your files can 213 change without changing sizes (e.g., if you have files that contain fixed 214 width data, such as timestamps). 215 216 2. When comparing composite GCS objects with objects at a cloud provider that 217 does not support CRC32C (which is the only checksum available for composite 218 objects). See 'gsutil help compose' for details about composite objects. 219 220 221<B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B> 222 If both the source and destination URL are cloud URLs from the same provider, 223 gsutil copies data "in the cloud" (i.e., without downloading to and uploading 224 from the machine where you run gsutil). In addition to the performance and 225 cost advantages of doing this, copying in the cloud preserves metadata (like 226 Content-Type and Cache-Control). In contrast, when you download data from the 227 cloud it ends up in a file, which has no associated metadata. Thus, unless you 228 have some way to hold on to or re-create that metadata, synchronizing a bucket 229 to a directory in the local file system will not retain the metadata. 230 231 Note that by default, the gsutil rsync command does not copy the ACLs of 232 objects being synchronized and instead will use the default bucket ACL (see 233 "gsutil help defacl"). You can override this behavior with the -p option (see 234 OPTIONS below). 235 236 237<B>SLOW CHECKSUMS</B> 238 If you find that CRC32C checksum computation runs slowly, this is likely 239 because you don't have a compiled CRC32c on your system. Try running: 240 241 gsutil ver -l 242 243 If the output contains: 244 245 compiled crcmod: False 246 247 you are running a Python library for computing CRC32C, which is much slower 248 than using the compiled code. For information on getting a compiled CRC32C 249 implementation, see 'gsutil help crc32c'. 250 251 252<B>LIMITATIONS</B> 253 1. The gsutil rsync command doesn't make the destination object's timestamps 254 match those of the source object (it can't; timestamp setting is not 255 allowed by the GCS API). 256 257 2. The gsutil rsync command considers only the current object generations in 258 the source and destination buckets when deciding what to copy / delete. If 259 versioning is enabled in the destination bucket then gsutil rsync's 260 overwriting or deleting objects will end up creating versions, but the 261 command doesn't try to make the archived generations match in the source 262 and destination buckets. 263 264 265 266<B>OPTIONS</B> 267 -c Causes the rsync command to compute checksums for files if the 268 size of source and destination match, and then compare 269 checksums. This option increases local disk I/O and run time 270 if either src_url or dst_url are on the local file system. 271 272 -C If an error occurs, continue to attempt to copy the remaining 273 files. If errors occurred, gsutil's exit status will be non-zero 274 even if this flag is set. This option is implicitly set when 275 running "gsutil -m rsync...". Note: -C only applies to the 276 actual copying operation. If an error occurs while iterating 277 over the files in the local directory (e.g., invalid Unicode 278 file name) gsutil will print an error message and abort. 279 280 -d Delete extra files under dst_url not found under src_url. By 281 default extra files are not deleted. Note: this option can 282 delete data quickly if you specify the wrong source/destination 283 combination. See the help section above, 284 "BE CAREFUL WHEN USING -d OPTION!". 285 286 -e Exclude symlinks. When specified, symbolic links will be 287 ignored. 288 289 -n Causes rsync to run in "dry run" mode, i.e., just outputting 290 what would be copied or deleted without actually doing any 291 copying/deleting. 292 293 -p Causes ACLs to be preserved when synchronizing in the cloud. 294 Note that this option has performance and cost implications when 295 using the XML API, as it requires separate HTTP calls for 296 interacting with ACLs. The performance issue can be mitigated to 297 some degree by using gsutil -m rsync to cause parallel 298 synchronization. Also, this option only works if you have OWNER 299 access to all of the objects that are copied. 300 301 You can avoid the additional performance and cost of using 302 rsync -p if you want all objects in the destination bucket to 303 end up with the same ACL by setting a default object ACL on that 304 bucket instead of using rsync -p. See 'help gsutil defacl'. 305 306 -R, -r Causes directories, buckets, and bucket subdirectories to be 307 synchronized recursively. If you neglect to use this option 308 gsutil will make only the top-level directory in the source 309 and destination URLs match, skipping any sub-directories. 310 311 -U Skip objects with unsupported object types instead of failing. 312 Unsupported object types are Amazon S3 Objects in the GLACIER 313 storage class. 314 315 -x pattern Causes files/objects matching pattern to be excluded, i.e., any 316 matching files/objects will not be copied or deleted. Note that 317 the pattern is a Python regular expression, not a wildcard (so, 318 matching any string ending in 'abc' would be specified using 319 '.*abc' rather than '*abc'). Note also that the exclude path is 320 always relative (similar to Unix rsync or tar exclude options). 321 For example, if you run the command: 322 323 gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket 324 325 it will skip the file dir/data1/a.txt. 326 327 You can use regex alternation to specify multiple exclusions, 328 for example: 329 330 gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket 331""") 332 333 334class _DiffAction(object): 335 COPY = 'copy' 336 REMOVE = 'remove' 337 338 339_NA = '-' 340_OUTPUT_BUFFER_SIZE = 64 * 1024 341_PROGRESS_REPORT_LISTING_COUNT = 10000 342 343 344# Tracks files we need to clean up at end or if interrupted. 345_tmp_files = [] 346 347 348# pylint: disable=unused-argument 349def _HandleSignals(signal_num, cur_stack_frame): 350 """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM.""" 351 CleanUpTempFiles() 352 353 354def CleanUpTempFiles(): 355 """Cleans up temp files. 356 357 This function allows the main (RunCommand) function to clean up at end of 358 operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary 359 because tempfile.NamedTemporaryFile doesn't allow the created file to be 360 re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which 361 doesn't automatically delete temp files. 362 """ 363 try: 364 for fname in _tmp_files: 365 os.unlink(fname) 366 except: # pylint: disable=bare-except 367 pass 368 369 370class _DiffToApply(object): 371 """Class that encapsulates info needed to apply diff for one object.""" 372 373 def __init__(self, src_url_str, dst_url_str, diff_action): 374 """Constructor. 375 376 Args: 377 src_url_str: The source URL string, or None if diff_action is REMOVE. 378 dst_url_str: The destination URL string. 379 diff_action: _DiffAction to be applied. 380 """ 381 self.src_url_str = src_url_str 382 self.dst_url_str = dst_url_str 383 self.diff_action = diff_action 384 385 386def _DiffToApplyArgChecker(command_instance, diff_to_apply): 387 """Arg checker that skips symlinks if -e flag specified.""" 388 if (diff_to_apply.diff_action == _DiffAction.REMOVE 389 or not command_instance.exclude_symlinks): 390 # No src URL is populated for REMOVE actions. 391 return True 392 exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str) 393 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name): 394 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url) 395 return False 396 return True 397 398 399def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c, 400 src_md5, dst_url_str, dst_size, dst_crc32c, 401 dst_md5): 402 """Computes any file checksums needed by _ObjectsMatch. 403 404 Args: 405 logger: logging.logger for outputting log messages. 406 src_url_str: Source URL string. 407 src_size: Source size 408 src_crc32c: Source CRC32c. 409 src_md5: Source MD5. 410 dst_url_str: Destination URL string. 411 dst_size: Destination size 412 dst_crc32c: Destination CRC32c. 413 dst_md5: Destination MD5. 414 415 Returns: 416 (src_crc32c, src_md5, dst_crc32c, dst_md5) 417 """ 418 src_url = StorageUrlFromString(src_url_str) 419 dst_url = StorageUrlFromString(dst_url_str) 420 if src_url.IsFileUrl(): 421 if dst_crc32c != _NA or dst_url.IsFileUrl(): 422 if src_size > TEN_MIB: 423 logger.info('Computing MD5 for %s...', src_url_str) 424 with open(src_url.object_name, 'rb') as fp: 425 src_crc32c = CalculateB64EncodedCrc32cFromContents(fp) 426 elif dst_md5 != _NA or dst_url.IsFileUrl(): 427 if dst_size > TEN_MIB: 428 logger.info('Computing MD5 for %s...', dst_url_str) 429 with open(src_url.object_name, 'rb') as fp: 430 src_md5 = CalculateB64EncodedMd5FromContents(fp) 431 if dst_url.IsFileUrl(): 432 if src_crc32c != _NA: 433 if src_size > TEN_MIB: 434 logger.info('Computing CRC32C for %s...', src_url_str) 435 with open(dst_url.object_name, 'rb') as fp: 436 dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp) 437 elif src_md5 != _NA: 438 if dst_size > TEN_MIB: 439 logger.info('Computing CRC32C for %s...', dst_url_str) 440 with open(dst_url.object_name, 'rb') as fp: 441 dst_md5 = CalculateB64EncodedMd5FromContents(fp) 442 return (src_crc32c, src_md5, dst_crc32c, dst_md5) 443 444 445def _ListUrlRootFunc(cls, args_tuple, thread_state=None): 446 """Worker function for listing files/objects under to be sync'd. 447 448 Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We 449 sort the listed URLs because we don't want to depend on consistent sort 450 order across file systems and cloud providers. 451 452 Args: 453 cls: Command instance. 454 args_tuple: (base_url_str, out_file_name, desc), where base_url_str is 455 top-level URL string to list; out_filename is name of file to 456 which sorted output should be written; desc is 'source' or 457 'destination'. 458 thread_state: gsutil Cloud API instance to use. 459 """ 460 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) 461 (base_url_str, out_filename, desc) = args_tuple 462 # We sort while iterating over base_url_str, allowing parallelism of batched 463 # sorting with collecting the listing. 464 out_file = io.open(out_filename, mode='w', encoding=UTF8) 465 try: 466 _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc), 467 out_file) 468 except Exception as e: # pylint: disable=broad-except 469 # Abandon rsync if an exception percolates up to this layer - retryable 470 # exceptions are handled in the lower layers, so we got a non-retryable 471 # exception (like 404 bucket not found) and proceeding would either be 472 # futile or could result in data loss - for example: 473 # gsutil rsync -d gs://non-existent-bucket ./localdir 474 # would delete files from localdir. 475 cls.logger.error( 476 'Caught non-retryable exception while listing %s: %s' % 477 (base_url_str, e)) 478 cls.non_retryable_listing_failures = 1 479 out_file.close() 480 481 482def _LocalDirIterator(base_url): 483 """A generator that yields a BLR for each file in a local directory. 484 485 We use this function instead of WildcardIterator for listing a local 486 directory without recursion, because the glob.globi implementation called 487 by WildcardIterator skips "dot" files (which we don't want to do when 488 synchronizing to or from a local directory). 489 490 Args: 491 base_url: URL for the directory over which to iterate. 492 493 Yields: 494 BucketListingObject for each file in the directory. 495 """ 496 for filename in os.listdir(base_url.object_name): 497 filename = os.path.join(base_url.object_name, filename) 498 if os.path.isfile(filename): 499 yield BucketListingObject(StorageUrlFromString(filename), None) 500 501 502def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc): 503 """Iterator over base_url_str formatting output per _BuildTmpOutputLine. 504 505 Args: 506 cls: Command instance. 507 gsutil_api: gsutil Cloud API instance to use for bucket listing. 508 base_url_str: The top-level URL string over which to iterate. 509 desc: 'source' or 'destination'. 510 511 Yields: 512 Output line formatted per _BuildTmpOutputLine. 513 """ 514 base_url = StorageUrlFromString(base_url_str) 515 if base_url.scheme == 'file' and not cls.recursion_requested: 516 iterator = _LocalDirIterator(base_url) 517 else: 518 if cls.recursion_requested: 519 wildcard = '%s/**' % base_url_str.rstrip('/\\') 520 else: 521 wildcard = '%s/*' % base_url_str.rstrip('/\\') 522 iterator = CreateWildcardIterator( 523 wildcard, gsutil_api, debug=cls.debug, 524 project_id=cls.project_id).IterObjects( 525 # Request just the needed fields, to reduce bandwidth usage. 526 bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']) 527 528 i = 0 529 for blr in iterator: 530 # Various GUI tools (like the GCS web console) create placeholder objects 531 # ending with '/' when the user creates an empty directory. Normally these 532 # tools should delete those placeholders once objects have been written 533 # "under" the directory, but sometimes the placeholders are left around. 534 # We need to filter them out here, otherwise if the user tries to rsync 535 # from GCS to a local directory it will result in a directory/file 536 # conflict (e.g., trying to download an object called "mydata/" where the 537 # local directory "mydata" exists). 538 url = blr.storage_url 539 if IsCloudSubdirPlaceholder(url, blr=blr): 540 # We used to output the message 'Skipping cloud sub-directory placeholder 541 # object...' but we no longer do so because it caused customer confusion. 542 continue 543 if (cls.exclude_symlinks and url.IsFileUrl() 544 and os.path.islink(url.object_name)): 545 continue 546 if cls.exclude_pattern: 547 str_to_check = url.url_string[len(base_url_str):] 548 if str_to_check.startswith(url.delim): 549 str_to_check = str_to_check[1:] 550 if cls.exclude_pattern.match(str_to_check): 551 continue 552 i += 1 553 if i % _PROGRESS_REPORT_LISTING_COUNT == 0: 554 cls.logger.info('At %s listing %d...', desc, i) 555 yield _BuildTmpOutputLine(blr) 556 557 558def _BuildTmpOutputLine(blr): 559 """Builds line to output to temp file for given BucketListingRef. 560 561 Args: 562 blr: The BucketListingRef. 563 564 Returns: 565 The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5 566 where crc32c will only be present for GCS URLs, and md5 will only be 567 present for cloud URLs that aren't composite objects. A missing field is 568 populated with '-'. 569 """ 570 crc32c = _NA 571 md5 = _NA 572 url = blr.storage_url 573 if url.IsFileUrl(): 574 size = os.path.getsize(url.object_name) 575 elif url.IsCloudUrl(): 576 size = blr.root_object.size 577 crc32c = blr.root_object.crc32c or _NA 578 md5 = blr.root_object.md5Hash or _NA 579 else: 580 raise CommandException('Got unexpected URL type (%s)' % url.scheme) 581 return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5) 582 583 584def _EncodeUrl(url_string): 585 """Encodes url_str with quote plus encoding and UTF8 character encoding. 586 587 We use this for all URL encodings. 588 589 Args: 590 url_string: String URL to encode. 591 592 Returns: 593 encoded URL. 594 """ 595 return urllib.quote_plus(url_string.encode(UTF8)) 596 597 598def _DecodeUrl(enc_url_string): 599 """Inverts encoding from EncodeUrl. 600 601 Args: 602 enc_url_string: String URL to decode. 603 604 Returns: 605 decoded URL. 606 """ 607 return urllib.unquote_plus(enc_url_string).decode(UTF8) 608 609 610# pylint: disable=bare-except 611def _BatchSort(in_iter, out_file): 612 """Sorts input lines from in_iter and outputs to out_file. 613 614 Sorts in batches as input arrives, so input file does not need to be loaded 615 into memory all at once. Derived from Python Recipe 466302: Sorting big 616 files the Python 2.4 way by Nicolas Lehuen. 617 618 Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line 619 when we could just sort on the first record (URL); but the sort order is 620 identical either way. 621 622 Args: 623 in_iter: Input iterator. 624 out_file: Output file. 625 """ 626 # Note: If chunk_files gets very large we can run out of open FDs. See .boto 627 # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines 628 # doesn't suffice (e.g., for someone synchronizing with a really large 629 # bucket), an option would be to make gsutil merge in passes, never 630 # opening all chunk files simultaneously. 631 buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000) 632 chunk_files = [] 633 try: 634 while True: 635 current_chunk = sorted(islice(in_iter, buffer_size)) 636 if not current_chunk: 637 break 638 output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)), 639 mode='w+', encoding=UTF8) 640 chunk_files.append(output_chunk) 641 output_chunk.writelines(unicode(''.join(current_chunk))) 642 output_chunk.flush() 643 output_chunk.seek(0) 644 out_file.writelines(heapq.merge(*chunk_files)) 645 except IOError as e: 646 if e.errno == errno.EMFILE: 647 raise CommandException('\n'.join(textwrap.wrap( 648 'Synchronization failed because too many open file handles were ' 649 'needed while building synchronization state. Please see the ' 650 'comments about rsync_buffer_lines in your .boto config file for a ' 651 'possible way to address this problem.'))) 652 raise 653 finally: 654 for chunk_file in chunk_files: 655 try: 656 chunk_file.close() 657 os.remove(chunk_file.name) 658 except: 659 pass 660 661 662class _DiffIterator(object): 663 """Iterator yielding sequence of _DiffToApply objects.""" 664 665 def __init__(self, command_obj, base_src_url, base_dst_url): 666 self.command_obj = command_obj 667 self.compute_file_checksums = command_obj.compute_file_checksums 668 self.delete_extras = command_obj.delete_extras 669 self.recursion_requested = command_obj.recursion_requested 670 self.logger = self.command_obj.logger 671 self.base_src_url = base_src_url 672 self.base_dst_url = base_dst_url 673 self.logger.info('Building synchronization state...') 674 675 (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp( 676 prefix='gsutil-rsync-src-') 677 _tmp_files.append(self.sorted_list_src_file_name) 678 (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp( 679 prefix='gsutil-rsync-dst-') 680 _tmp_files.append(self.sorted_list_dst_file_name) 681 # Close the file handles; the file will be opened in write mode by 682 # _ListUrlRootFunc. 683 os.close(src_fh) 684 os.close(dst_fh) 685 686 # Build sorted lists of src and dst URLs in parallel. To do this, pass args 687 # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc) 688 # where base_url_str is the starting URL string for listing. 689 args_iter = iter([ 690 (self.base_src_url.url_string, self.sorted_list_src_file_name, 691 'source'), 692 (self.base_dst_url.url_string, self.sorted_list_dst_file_name, 693 'destination') 694 ]) 695 696 # Contains error message from non-retryable listing failure. 697 command_obj.non_retryable_listing_failures = 0 698 shared_attrs = ['non_retryable_listing_failures'] 699 command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler, 700 shared_attrs, arg_checker=DummyArgChecker, 701 parallel_operations_override=True, 702 fail_on_error=True) 703 704 if command_obj.non_retryable_listing_failures: 705 raise CommandException('Caught non-retryable exception - aborting rsync') 706 707 self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r') 708 self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r') 709 710 # Wrap iterators in PluralityCheckableIterator so we can check emptiness. 711 self.sorted_src_urls_it = PluralityCheckableIterator( 712 iter(self.sorted_list_src_file)) 713 self.sorted_dst_urls_it = PluralityCheckableIterator( 714 iter(self.sorted_list_dst_file)) 715 716 def _ParseTmpFileLine(self, line): 717 """Parses output from _BuildTmpOutputLine. 718 719 Parses into tuple: 720 (URL, size, crc32c, md5) 721 where crc32c and/or md5 can be _NA. 722 723 Args: 724 line: The line to parse. 725 726 Returns: 727 Parsed tuple: (url, size, crc32c, md5) 728 """ 729 (encoded_url, size, crc32c, md5) = line.split() 730 return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip()) 731 732 def _WarnIfMissingCloudHash(self, url_str, crc32c, md5): 733 """Warns if given url_str is a cloud URL and is missing both crc32c and md5. 734 735 Args: 736 url_str: Destination URL string. 737 crc32c: Destination CRC32c. 738 md5: Destination MD5. 739 740 Returns: 741 True if issued warning. 742 """ 743 # One known way this can currently happen is when rsync'ing objects larger 744 # than 5 GB from S3 (for which the etag is not an MD5). 745 if (StorageUrlFromString(url_str).IsCloudUrl() 746 and crc32c == _NA and md5 == _NA): 747 self.logger.warn( 748 'Found no hashes to validate %s. Integrity cannot be assured without ' 749 'hashes.', url_str) 750 return True 751 return False 752 753 def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5, 754 dst_url_str, dst_size, dst_crc32c, dst_md5): 755 """Returns True if src and dst objects are the same. 756 757 Uses size plus whatever checksums are available. 758 759 Args: 760 src_url_str: Source URL string. 761 src_size: Source size 762 src_crc32c: Source CRC32c. 763 src_md5: Source MD5. 764 dst_url_str: Destination URL string. 765 dst_size: Destination size 766 dst_crc32c: Destination CRC32c. 767 dst_md5: Destination MD5. 768 769 Returns: 770 True/False. 771 """ 772 # Note: This function is called from __iter__, which is called from the 773 # Command.Apply driver. Thus, all checksum computation will be run in a 774 # single thread, which is good (having multiple threads concurrently 775 # computing checksums would thrash the disk). 776 if src_size != dst_size: 777 return False 778 if self.compute_file_checksums: 779 (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums( 780 self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str, 781 dst_size, dst_crc32c, dst_md5) 782 if src_md5 != _NA and dst_md5 != _NA: 783 self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str) 784 return src_md5 == dst_md5 785 if src_crc32c != _NA and dst_crc32c != _NA: 786 self.logger.debug( 787 'Comparing crc32c for %s and %s', src_url_str, dst_url_str) 788 return src_crc32c == dst_crc32c 789 if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5): 790 self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5) 791 # Without checksums to compare we depend only on basic size comparison. 792 return True 793 794 def __iter__(self): 795 """Iterates over src/dst URLs and produces a _DiffToApply sequence. 796 797 Yields: 798 The _DiffToApply. 799 """ 800 # Strip trailing slashes, if any, so we compute tail length against 801 # consistent position regardless of whether trailing slashes were included 802 # or not in URL. 803 base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\')) 804 base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\')) 805 src_url_str = dst_url_str = None 806 # Invariant: After each yield, the URLs in src_url_str, dst_url_str, 807 # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet 808 # processed. Each time we encounter None in src_url_str or dst_url_str we 809 # populate from the respective iterator, and we reset one or the other value 810 # to None after yielding an action that disposes of that URL. 811 while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None: 812 if src_url_str is None: 813 (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine( 814 self.sorted_src_urls_it.next()) 815 # Skip past base URL and normalize slashes so we can compare across 816 # clouds/file systems (including Windows). 817 src_url_str_to_check = _EncodeUrl( 818 src_url_str[base_src_url_len:].replace('\\', '/')) 819 dst_url_str_would_copy_to = copy_helper.ConstructDstUrl( 820 self.base_src_url, StorageUrlFromString(src_url_str), True, True, 821 self.base_dst_url, False, self.recursion_requested).url_string 822 if self.sorted_dst_urls_it.IsEmpty(): 823 # We've reached end of dst URLs, so copy src to dst. 824 yield _DiffToApply( 825 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) 826 src_url_str = None 827 continue 828 if not dst_url_str: 829 (dst_url_str, dst_size, dst_crc32c, dst_md5) = ( 830 self._ParseTmpFileLine(self.sorted_dst_urls_it.next())) 831 # Skip past base URL and normalize slashes so we can compare acros 832 # clouds/file systems (including Windows). 833 dst_url_str_to_check = _EncodeUrl( 834 dst_url_str[base_dst_url_len:].replace('\\', '/')) 835 836 if src_url_str_to_check < dst_url_str_to_check: 837 # There's no dst object corresponding to src object, so copy src to dst. 838 yield _DiffToApply( 839 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY) 840 src_url_str = None 841 elif src_url_str_to_check > dst_url_str_to_check: 842 # dst object without a corresponding src object, so remove dst if -d 843 # option was specified. 844 if self.delete_extras: 845 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) 846 dst_url_str = None 847 else: 848 # There is a dst object corresponding to src object, so check if objects 849 # match. 850 if self._ObjectsMatch( 851 src_url_str, src_size, src_crc32c, src_md5, 852 dst_url_str, dst_size, dst_crc32c, dst_md5): 853 # Continue iterating without yielding a _DiffToApply. 854 pass 855 else: 856 yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY) 857 src_url_str = None 858 dst_url_str = None 859 860 # If -d option specified any files/objects left in dst iteration should be 861 # removed. 862 if not self.delete_extras: 863 return 864 if dst_url_str: 865 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) 866 dst_url_str = None 867 for line in self.sorted_dst_urls_it: 868 (dst_url_str, _, _, _) = self._ParseTmpFileLine(line) 869 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE) 870 871 872def _RsyncFunc(cls, diff_to_apply, thread_state=None): 873 """Worker function for performing the actual copy and remove operations.""" 874 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state) 875 dst_url_str = diff_to_apply.dst_url_str 876 dst_url = StorageUrlFromString(dst_url_str) 877 if diff_to_apply.diff_action == _DiffAction.REMOVE: 878 if cls.dryrun: 879 cls.logger.info('Would remove %s', dst_url) 880 else: 881 cls.logger.info('Removing %s', dst_url) 882 if dst_url.IsFileUrl(): 883 os.unlink(dst_url.object_name) 884 else: 885 try: 886 gsutil_api.DeleteObject( 887 dst_url.bucket_name, dst_url.object_name, 888 generation=dst_url.generation, provider=dst_url.scheme) 889 except NotFoundException: 890 # If the object happened to be deleted by an external process, this 891 # is fine because it moves us closer to the desired state. 892 pass 893 elif diff_to_apply.diff_action == _DiffAction.COPY: 894 src_url_str = diff_to_apply.src_url_str 895 src_url = StorageUrlFromString(src_url_str) 896 if cls.dryrun: 897 cls.logger.info('Would copy %s to %s', src_url, dst_url) 898 else: 899 try: 900 copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls, 901 _RsyncExceptionHandler, 902 headers=cls.headers) 903 except SkipUnsupportedObjectError, e: 904 cls.logger.info('Skipping item %s with unsupported object type %s', 905 src_url, e.unsupported_type) 906 907 else: 908 raise CommandException('Got unexpected DiffAction (%d)' 909 % diff_to_apply.diff_action) 910 911 912def _RootListingExceptionHandler(cls, e): 913 """Simple exception handler for exceptions during listing URLs to sync.""" 914 cls.logger.error(str(e)) 915 916 917def _RsyncExceptionHandler(cls, e): 918 """Simple exception handler to allow post-completion status.""" 919 cls.logger.error(str(e)) 920 cls.op_failure_count += 1 921 cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n', 922 traceback.format_exc()) 923 924 925class RsyncCommand(Command): 926 """Implementation of gsutil rsync command.""" 927 928 # Command specification. See base class for documentation. 929 command_spec = Command.CreateCommandSpec( 930 'rsync', 931 command_name_aliases=[], 932 usage_synopsis=_SYNOPSIS, 933 min_args=2, 934 max_args=2, 935 supported_sub_args='cCdenprRUx:', 936 file_url_ok=True, 937 provider_url_ok=False, 938 urls_start_arg=0, 939 gs_api_support=[ApiSelector.XML, ApiSelector.JSON], 940 gs_default_api=ApiSelector.JSON, 941 argparse_arguments=[ 942 CommandArgument.MakeNCloudOrFileURLsArgument(2) 943 ] 944 ) 945 # Help specification. See help_provider.py for documentation. 946 help_spec = Command.HelpSpec( 947 help_name='rsync', 948 help_name_aliases=['sync', 'synchronize'], 949 help_type='command_help', 950 help_one_line_summary='Synchronize content of two buckets/directories', 951 help_text=_DETAILED_HELP_TEXT, 952 subcommand_help_text={}, 953 ) 954 total_bytes_transferred = 0 955 956 def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir): 957 """Sanity checks that URL names an existing container. 958 959 Args: 960 url_str: URL string to check. 961 treat_nonexistent_object_as_subdir: indicates if should treat a 962 non-existent object as a subdir. 963 964 Returns: 965 URL for checked string. 966 967 Raises: 968 CommandException if url_str doesn't name an existing container. 969 """ 970 (url, have_existing_container) = ( 971 copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug, 972 self.project_id, 973 treat_nonexistent_object_as_subdir)) 974 if not have_existing_container: 975 raise CommandException( 976 'arg (%s) does not name a directory, bucket, or bucket subdir.' 977 % url_str) 978 return url 979 980 def RunCommand(self): 981 """Command entry point for the rsync command.""" 982 self._ParseOpts() 983 if self.compute_file_checksums and not UsingCrcmodExtension(crcmod): 984 self.logger.warn(SLOW_CRCMOD_WARNING) 985 986 src_url = self._InsistContainer(self.args[0], False) 987 dst_url = self._InsistContainer(self.args[1], True) 988 989 # Tracks if any copy or rm operations failed. 990 self.op_failure_count = 0 991 992 # List of attributes to share/manage across multiple processes in 993 # parallel (-m) mode. 994 shared_attrs = ['op_failure_count'] 995 996 for signal_num in GetCaughtSignals(): 997 RegisterSignalHandler(signal_num, _HandleSignals) 998 999 # Perform sync requests in parallel (-m) mode, if requested, using 1000 # configured number of parallel processes and threads. Otherwise, 1001 # perform requests with sequential function calls in current process. 1002 diff_iterator = _DiffIterator(self, src_url, dst_url) 1003 self.logger.info('Starting synchronization') 1004 try: 1005 self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler, 1006 shared_attrs, arg_checker=_DiffToApplyArgChecker, 1007 fail_on_error=True) 1008 finally: 1009 CleanUpTempFiles() 1010 1011 if self.op_failure_count: 1012 plural_str = 's' if self.op_failure_count else '' 1013 raise CommandException( 1014 '%d file%s/object%s could not be copied/removed.' % 1015 (self.op_failure_count, plural_str, plural_str)) 1016 1017 def _ParseOpts(self): 1018 # exclude_symlinks is handled by Command parent class, so save in Command 1019 # state rather than CopyHelperOpts. 1020 self.exclude_symlinks = False 1021 # continue_on_error is handled by Command parent class, so save in Command 1022 # state rather than CopyHelperOpts. 1023 self.continue_on_error = False 1024 self.delete_extras = False 1025 preserve_acl = False 1026 self.compute_file_checksums = False 1027 self.dryrun = False 1028 self.exclude_pattern = None 1029 self.skip_unsupported_objects = False 1030 # self.recursion_requested is initialized in command.py (so it can be 1031 # checked in parent class for all commands). 1032 1033 if self.sub_opts: 1034 for o, a in self.sub_opts: 1035 if o == '-c': 1036 self.compute_file_checksums = True 1037 # Note: In gsutil cp command this is specified using -c but here we use 1038 # -C so we can use -c for checksum arg (to be consistent with Unix rsync 1039 # command options). 1040 elif o == '-C': 1041 self.continue_on_error = True 1042 elif o == '-d': 1043 self.delete_extras = True 1044 elif o == '-e': 1045 self.exclude_symlinks = True 1046 elif o == '-n': 1047 self.dryrun = True 1048 elif o == '-p': 1049 preserve_acl = True 1050 elif o == '-r' or o == '-R': 1051 self.recursion_requested = True 1052 elif o == '-U': 1053 self.skip_unsupported_objects = True 1054 elif o == '-x': 1055 if not a: 1056 raise CommandException('Invalid blank exclude filter') 1057 try: 1058 self.exclude_pattern = re.compile(a) 1059 except re.error: 1060 raise CommandException('Invalid exclude filter (%s)' % a) 1061 return CreateCopyHelperOpts( 1062 preserve_acl=preserve_acl, 1063 skip_unsupported_objects=self.skip_unsupported_objects) 1064