1# -*- coding: utf-8 -*- 2# Copyright 2015 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Helper functions for tracker file functionality.""" 16 17import errno 18import hashlib 19import json 20import os 21import re 22 23from boto import config 24from gslib.exception import CommandException 25from gslib.util import CreateDirIfNeeded 26from gslib.util import GetGsutilStateDir 27from gslib.util import ResumableThreshold 28from gslib.util import UTF8 29 30# The maximum length of a file name can vary wildly between different 31# operating systems, so we always ensure that tracker files are less 32# than 100 characters in order to avoid any such issues. 33MAX_TRACKER_FILE_NAME_LENGTH = 100 34 35 36TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT = ( 37 'Couldn\'t write tracker file (%s): %s. This can happen if gsutil is ' 38 'configured to save tracker files to an unwritable directory)') 39 40 41class TrackerFileType(object): 42 UPLOAD = 'upload' 43 DOWNLOAD = 'download' 44 DOWNLOAD_COMPONENT = 'download_component' 45 PARALLEL_UPLOAD = 'parallel_upload' 46 SLICED_DOWNLOAD = 'sliced_download' 47 REWRITE = 'rewrite' 48 49 50def _HashFilename(filename): 51 """Apply a hash function (SHA1) to shorten the passed file name. 52 53 The spec for the hashed file name is as follows: 54 55 TRACKER_<hash>_<trailing> 56 57 where hash is a SHA1 hash on the original file name and trailing is 58 the last 16 chars from the original file name. Max file name lengths 59 vary by operating system so the goal of this function is to ensure 60 the hashed version takes fewer than 100 characters. 61 62 Args: 63 filename: file name to be hashed. 64 65 Returns: 66 shorter, hashed version of passed file name 67 """ 68 if isinstance(filename, unicode): 69 filename = filename.encode(UTF8) 70 else: 71 filename = unicode(filename, UTF8).encode(UTF8) 72 m = hashlib.sha1(filename) 73 return 'TRACKER_' + m.hexdigest() + '.' + filename[-16:] 74 75 76def CreateTrackerDirIfNeeded(): 77 """Looks up or creates the gsutil tracker file directory. 78 79 This is the configured directory where gsutil keeps its resumable transfer 80 tracker files. This function creates it if it doesn't already exist. 81 82 Returns: 83 The pathname to the tracker directory. 84 """ 85 tracker_dir = config.get( 86 'GSUtil', 'resumable_tracker_dir', 87 os.path.join(GetGsutilStateDir(), 'tracker-files')) 88 CreateDirIfNeeded(tracker_dir) 89 return tracker_dir 90 91 92def GetRewriteTrackerFilePath(src_bucket_name, src_obj_name, dst_bucket_name, 93 dst_obj_name, api_selector): 94 """Gets the tracker file name described by the arguments. 95 96 Args: 97 src_bucket_name: Source bucket (string). 98 src_obj_name: Source object (string). 99 dst_bucket_name: Destination bucket (string). 100 dst_obj_name: Destination object (string) 101 api_selector: API to use for this operation. 102 103 Returns: 104 File path to tracker file. 105 """ 106 # Encode the src and dest bucket and object names into the tracker file 107 # name. 108 res_tracker_file_name = ( 109 re.sub('[/\\\\]', '_', 'rewrite__%s__%s__%s__%s__%s.token' % 110 (src_bucket_name, src_obj_name, dst_bucket_name, 111 dst_obj_name, api_selector))) 112 113 return _HashAndReturnPath(res_tracker_file_name, TrackerFileType.REWRITE) 114 115 116def GetTrackerFilePath(dst_url, tracker_file_type, api_selector, src_url=None, 117 component_num=None): 118 """Gets the tracker file name described by the arguments. 119 120 Args: 121 dst_url: Destination URL for tracker file. 122 tracker_file_type: TrackerFileType for this operation. 123 api_selector: API to use for this operation. 124 src_url: Source URL for the source file name for parallel uploads. 125 component_num: Component number if this is a download component, else None. 126 127 Returns: 128 File path to tracker file. 129 """ 130 if tracker_file_type == TrackerFileType.UPLOAD: 131 # Encode the dest bucket and object name into the tracker file name. 132 res_tracker_file_name = ( 133 re.sub('[/\\\\]', '_', 'resumable_upload__%s__%s__%s.url' % 134 (dst_url.bucket_name, dst_url.object_name, api_selector))) 135 elif tracker_file_type == TrackerFileType.DOWNLOAD: 136 # Encode the fully-qualified dest file name into the tracker file name. 137 res_tracker_file_name = ( 138 re.sub('[/\\\\]', '_', 'resumable_download__%s__%s.etag' % 139 (os.path.realpath(dst_url.object_name), api_selector))) 140 elif tracker_file_type == TrackerFileType.DOWNLOAD_COMPONENT: 141 # Encode the fully-qualified dest file name and the component number 142 # into the tracker file name. 143 res_tracker_file_name = ( 144 re.sub('[/\\\\]', '_', 'resumable_download__%s__%s__%d.etag' % 145 (os.path.realpath(dst_url.object_name), api_selector, 146 component_num))) 147 elif tracker_file_type == TrackerFileType.PARALLEL_UPLOAD: 148 # Encode the dest bucket and object names as well as the source file name 149 # into the tracker file name. 150 res_tracker_file_name = ( 151 re.sub('[/\\\\]', '_', 'parallel_upload__%s__%s__%s__%s.url' % 152 (dst_url.bucket_name, dst_url.object_name, 153 src_url, api_selector))) 154 elif tracker_file_type == TrackerFileType.SLICED_DOWNLOAD: 155 # Encode the fully-qualified dest file name into the tracker file name. 156 res_tracker_file_name = ( 157 re.sub('[/\\\\]', '_', 'sliced_download__%s__%s.etag' % 158 (os.path.realpath(dst_url.object_name), api_selector))) 159 elif tracker_file_type == TrackerFileType.REWRITE: 160 # Should use GetRewriteTrackerFilePath instead. 161 raise NotImplementedError() 162 163 return _HashAndReturnPath(res_tracker_file_name, tracker_file_type) 164 165 166def DeleteDownloadTrackerFiles(dst_url, api_selector): 167 """Deletes all tracker files corresponding to an object download. 168 169 Args: 170 dst_url: StorageUrl describing the destination file. 171 api_selector: The Cloud API implementation used. 172 """ 173 # Delete non-sliced download tracker file. 174 DeleteTrackerFile(GetTrackerFilePath(dst_url, TrackerFileType.DOWNLOAD, 175 api_selector)) 176 177 # Delete all sliced download tracker files. 178 tracker_files = GetSlicedDownloadTrackerFilePaths(dst_url, api_selector) 179 for tracker_file in tracker_files: 180 DeleteTrackerFile(tracker_file) 181 182 183def GetSlicedDownloadTrackerFilePaths(dst_url, api_selector, 184 num_components=None): 185 """Gets a list of sliced download tracker file paths. 186 187 The list consists of the parent tracker file path in index 0, and then 188 any existing component tracker files in [1:]. 189 190 Args: 191 dst_url: Destination URL for tracker file. 192 api_selector: API to use for this operation. 193 num_components: The number of component tracker files, if already known. 194 If not known, the number will be retrieved from the parent 195 tracker file on disk. 196 Returns: 197 File path to tracker file. 198 """ 199 parallel_tracker_file_path = GetTrackerFilePath( 200 dst_url, TrackerFileType.SLICED_DOWNLOAD, api_selector) 201 tracker_file_paths = [parallel_tracker_file_path] 202 203 # If we don't know the number of components, check the tracker file. 204 if num_components is None: 205 tracker_file = None 206 try: 207 tracker_file = open(parallel_tracker_file_path, 'r') 208 num_components = json.load(tracker_file)['num_components'] 209 except (IOError, ValueError): 210 return tracker_file_paths 211 finally: 212 if tracker_file: 213 tracker_file.close() 214 215 for i in range(num_components): 216 tracker_file_paths.append(GetTrackerFilePath( 217 dst_url, TrackerFileType.DOWNLOAD_COMPONENT, api_selector, 218 component_num=i)) 219 220 return tracker_file_paths 221 222 223def _HashAndReturnPath(res_tracker_file_name, tracker_file_type): 224 """Hashes and returns a tracker file path. 225 226 Args: 227 res_tracker_file_name: The tracker file name prior to it being hashed. 228 tracker_file_type: The TrackerFileType of res_tracker_file_name. 229 230 Returns: 231 Final (hashed) tracker file path. 232 """ 233 resumable_tracker_dir = CreateTrackerDirIfNeeded() 234 hashed_tracker_file_name = _HashFilename(res_tracker_file_name) 235 tracker_file_name = '%s_%s' % (str(tracker_file_type).lower(), 236 hashed_tracker_file_name) 237 tracker_file_path = '%s%s%s' % (resumable_tracker_dir, os.sep, 238 tracker_file_name) 239 assert len(tracker_file_name) < MAX_TRACKER_FILE_NAME_LENGTH 240 return tracker_file_path 241 242 243def DeleteTrackerFile(tracker_file_name): 244 if tracker_file_name and os.path.exists(tracker_file_name): 245 os.unlink(tracker_file_name) 246 247 248def HashRewriteParameters( 249 src_obj_metadata, dst_obj_metadata, projection, src_generation=None, 250 gen_match=None, meta_gen_match=None, canned_acl=None, fields=None, 251 max_bytes_per_call=None): 252 """Creates an MD5 hex digest of the parameters for a rewrite call. 253 254 Resuming rewrites requires that the input parameters are identical. Thus, 255 the rewrite tracker file needs to represent the input parameters. For 256 easy comparison, hash the input values. If a user does a performs a 257 same-source/same-destination rewrite via a different command (for example, 258 with a changed ACL), the hashes will not match and we will restart the 259 rewrite from the beginning. 260 261 Args: 262 src_obj_metadata: apitools Object describing source object. Must include 263 bucket, name, and etag. 264 dst_obj_metadata: apitools Object describing destination object. Must 265 include bucket and object name 266 projection: Projection used for the API call. 267 src_generation: Optional source generation. 268 gen_match: Optional generation precondition. 269 meta_gen_match: Optional metageneration precondition. 270 canned_acl: Optional canned ACL string. 271 fields: Optional fields to include in response. 272 max_bytes_per_call: Optional maximum bytes rewritten per call. 273 274 Returns: 275 MD5 hex digest Hash of the input parameters, or None if required parameters 276 are missing. 277 """ 278 if (not src_obj_metadata or 279 not src_obj_metadata.bucket or 280 not src_obj_metadata.name or 281 not src_obj_metadata.etag or 282 not dst_obj_metadata or 283 not dst_obj_metadata.bucket or 284 not dst_obj_metadata.name or 285 not projection): 286 return 287 md5_hash = hashlib.md5() 288 for input_param in ( 289 src_obj_metadata, dst_obj_metadata, projection, src_generation, 290 gen_match, meta_gen_match, canned_acl, fields, max_bytes_per_call): 291 md5_hash.update(str(input_param)) 292 return md5_hash.hexdigest() 293 294 295def ReadRewriteTrackerFile(tracker_file_name, rewrite_params_hash): 296 """Attempts to read a rewrite tracker file. 297 298 Args: 299 tracker_file_name: Tracker file path string. 300 rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed 301 by HashRewriteParameters. 302 303 Returns: 304 String rewrite_token for resuming rewrite requests if a matching tracker 305 file exists, None otherwise (which will result in starting a new rewrite). 306 """ 307 # Check to see if we already have a matching tracker file. 308 tracker_file = None 309 if not rewrite_params_hash: 310 return 311 try: 312 tracker_file = open(tracker_file_name, 'r') 313 existing_hash = tracker_file.readline().rstrip('\n') 314 if existing_hash == rewrite_params_hash: 315 # Next line is the rewrite token. 316 return tracker_file.readline().rstrip('\n') 317 except IOError as e: 318 # Ignore non-existent file (happens first time a rewrite is attempted. 319 if e.errno != errno.ENOENT: 320 print('Couldn\'t read Copy tracker file (%s): %s. Restarting copy ' 321 'from scratch.' % 322 (tracker_file_name, e.strerror)) 323 finally: 324 if tracker_file: 325 tracker_file.close() 326 327 328def WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, 329 rewrite_token): 330 """Writes a rewrite tracker file. 331 332 Args: 333 tracker_file_name: Tracker file path string. 334 rewrite_params_hash: MD5 hex digest of rewrite call parameters constructed 335 by HashRewriteParameters. 336 rewrite_token: Rewrite token string returned by the service. 337 """ 338 _WriteTrackerFile(tracker_file_name, '%s\n%s\n' % (rewrite_params_hash, 339 rewrite_token)) 340 341 342def ReadOrCreateDownloadTrackerFile(src_obj_metadata, dst_url, logger, 343 api_selector, start_byte, 344 existing_file_size, component_num=None): 345 """Checks for a download tracker file and creates one if it does not exist. 346 347 The methodology for determining the download start point differs between 348 normal and sliced downloads. For normal downloads, the existing bytes in 349 the file are presumed to be correct and have been previously downloaded from 350 the server (if a tracker file exists). In this case, the existing file size 351 is used to determine the download start point. For sliced downloads, the 352 number of bytes previously retrieved from the server cannot be determined 353 from the existing file size, and so the number of bytes known to have been 354 previously downloaded is retrieved from the tracker file. 355 356 Args: 357 src_obj_metadata: Metadata for the source object. Must include etag and 358 generation. 359 dst_url: Destination URL for tracker file. 360 logger: For outputting log messages. 361 api_selector: API to use for this operation. 362 start_byte: The start byte of the byte range for this download. 363 existing_file_size: Size of existing file for this download on disk. 364 component_num: The component number, if this is a component of a parallel 365 download, else None. 366 367 Returns: 368 tracker_file_name: The name of the tracker file, if one was used. 369 download_start_byte: The first byte that still needs to be downloaded. 370 """ 371 assert src_obj_metadata.etag 372 373 tracker_file_name = None 374 if src_obj_metadata.size < ResumableThreshold(): 375 # Don't create a tracker file for a small downloads; cross-process resumes 376 # won't work, but restarting a small download is inexpensive. 377 return tracker_file_name, start_byte 378 379 download_name = dst_url.object_name 380 if component_num is None: 381 tracker_file_type = TrackerFileType.DOWNLOAD 382 else: 383 tracker_file_type = TrackerFileType.DOWNLOAD_COMPONENT 384 download_name += ' component %d' % component_num 385 386 tracker_file_name = GetTrackerFilePath(dst_url, tracker_file_type, 387 api_selector, 388 component_num=component_num) 389 tracker_file = None 390 # Check to see if we already have a matching tracker file. 391 try: 392 tracker_file = open(tracker_file_name, 'r') 393 if tracker_file_type is TrackerFileType.DOWNLOAD: 394 etag_value = tracker_file.readline().rstrip('\n') 395 if etag_value == src_obj_metadata.etag: 396 return tracker_file_name, existing_file_size 397 elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT: 398 component_data = json.loads(tracker_file.read()) 399 if (component_data['etag'] == src_obj_metadata.etag and 400 component_data['generation'] == src_obj_metadata.generation): 401 return tracker_file_name, component_data['download_start_byte'] 402 403 logger.warn('Tracker file doesn\'t match for download of %s. Restarting ' 404 'download from scratch.' % download_name) 405 406 except (IOError, ValueError) as e: 407 # Ignore non-existent file (happens first time a download 408 # is attempted on an object), but warn user for other errors. 409 if isinstance(e, ValueError) or e.errno != errno.ENOENT: 410 logger.warn('Couldn\'t read download tracker file (%s): %s. Restarting ' 411 'download from scratch.' % (tracker_file_name, str(e))) 412 finally: 413 if tracker_file: 414 tracker_file.close() 415 416 # There wasn't a matching tracker file, so create one and then start the 417 # download from scratch. 418 if tracker_file_type is TrackerFileType.DOWNLOAD: 419 _WriteTrackerFile(tracker_file_name, '%s\n' % src_obj_metadata.etag) 420 elif tracker_file_type is TrackerFileType.DOWNLOAD_COMPONENT: 421 WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata, 422 start_byte) 423 return tracker_file_name, start_byte 424 425 426def WriteDownloadComponentTrackerFile(tracker_file_name, src_obj_metadata, 427 current_file_pos): 428 """Updates or creates a download component tracker file on disk. 429 430 Args: 431 tracker_file_name: The name of the tracker file. 432 src_obj_metadata: Metadata for the source object. Must include etag. 433 current_file_pos: The current position in the file. 434 """ 435 component_data = {'etag': src_obj_metadata.etag, 436 'generation': src_obj_metadata.generation, 437 'download_start_byte': current_file_pos} 438 439 _WriteTrackerFile(tracker_file_name, json.dumps(component_data)) 440 441 442def _WriteTrackerFile(tracker_file_name, data): 443 """Creates a tracker file, storing the input data.""" 444 try: 445 with os.fdopen(os.open(tracker_file_name, 446 os.O_WRONLY | os.O_CREAT, 0600), 'w') as tf: 447 tf.write(data) 448 return False 449 except (IOError, OSError) as e: 450 raise RaiseUnwritableTrackerFileException(tracker_file_name, e.strerror) 451 452 453def RaiseUnwritableTrackerFileException(tracker_file_name, error_str): 454 """Raises an exception when unable to write the tracker file.""" 455 raise CommandException(TRACKER_FILE_UNWRITABLE_EXCEPTION_TEXT % 456 (tracker_file_name, error_str)) 457