1# -*- coding: utf-8 -*- 2# Copyright 2014 Google Inc. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Helper functions for hashing functionality.""" 16 17import base64 18import binascii 19from hashlib import md5 20import os 21 22from boto import config 23import crcmod 24 25from gslib.exception import CommandException 26from gslib.util import DEFAULT_FILE_BUFFER_SIZE 27from gslib.util import MIN_SIZE_COMPUTE_LOGGING 28from gslib.util import TRANSFER_BUFFER_SIZE 29from gslib.util import UsingCrcmodExtension 30 31 32SLOW_CRCMOD_WARNING = """ 33WARNING: You have requested checksumming but your crcmod installation isn't 34using the module's C extension, so checksumming will run very slowly. For help 35installing the extension, please see: 36 $ gsutil help crcmod 37""" 38 39 40_SLOW_CRCMOD_DOWNLOAD_WARNING = """ 41WARNING: Downloading this composite object requires integrity checking with 42CRC32c, but your crcmod installation isn't using the module's C extension, 43so the hash computation will likely throttle download performance. For help 44installing the extension, please see: 45 $ gsutil help crcmod 46To disable slow integrity checking, see the "check_hashes" option in your 47boto config file. 48""" 49 50_SLOW_CRC_EXCEPTION_TEXT = """ 51Downloading this composite object requires integrity checking with CRC32c, 52but your crcmod installation isn't using the module's C extension, so the 53hash computation will likely throttle download performance. For help 54installing the extension, please see: 55 56 $ gsutil help crcmod 57 58To download regardless of crcmod performance or to skip slow integrity 59checks, see the "check_hashes" option in your boto config file. 60 61NOTE: It is strongly recommended that you not disable integrity checks. Doing so 62could allow data corruption to go undetected during uploading/downloading.""" 63 64 65_NO_HASH_CHECK_WARNING = """ 66WARNING: This download will not be validated since your crcmod installation 67doesn't use the module's C extension, so the hash computation would likely 68throttle download performance. For help in installing the extension, please 69see: 70 $ gsutil help crcmod 71To force integrity checking, see the "check_hashes" option in your boto config 72file. 73""" 74 75 76# Configuration values for hashing. 77CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail' 78CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip' 79CHECK_HASH_ALWAYS = 'always' 80CHECK_HASH_NEVER = 'never' 81 82# Table storing polynomial values of x^(2^k) mod CASTAGNOLI_POLY for all k < 31, 83# where x^(2^k) and CASTAGNOLI_POLY are both considered polynomials. This is 84# sufficient since x^(2^31) mod CASTAGNOLI_POLY = x. 85X_POW_2K_TABLE = [2, 4, 16, 256, 65536, 517762881, 984302966, 86 408362264, 1503875210, 2862076957, 3884826397, 1324787473, 87 621200174, 1758783527, 1416537776, 1180494764, 648569364, 88 2521473789, 994858823, 1728245375, 3498467999, 4059169852, 89 3345064394, 2828422810, 2429203150, 3336788029, 860151998, 90 2102628683, 1033187991, 4243778976, 1123580069] 91# Castagnoli polynomial and its degree. 92CASTAGNOLI_POLY = 4812730177 93DEGREE = 32 94 95 96def ConcatCrc32c(crc_a, crc_b, num_bytes_in_b): 97 """Computes CRC32C for concat(A, B) given crc(A), crc(B) and len(B). 98 99 An explanation of the algorithm can be found at 100 crcutil.googlecode.com/files/crc-doc.1.0.pdf. 101 102 Args: 103 crc_a: A 32-bit integer representing crc(A) with least-significant 104 coefficient first. 105 crc_b: Same as crc_a. 106 num_bytes_in_b: Length of B in bytes. 107 108 Returns: 109 CRC32C for concat(A, B) 110 """ 111 if not num_bytes_in_b: 112 return crc_a 113 114 return _ExtendByZeros(crc_a, 8 * num_bytes_in_b) ^ crc_b 115 116 117def _CrcMultiply(p, q): 118 """Multiplies two polynomials together modulo CASTAGNOLI_POLY. 119 120 Args: 121 p: The first polynomial. 122 q: The second polynomial. 123 124 Returns: 125 Result of the multiplication. 126 """ 127 128 result = 0 129 top_bit = 1 << DEGREE 130 for _ in range(DEGREE): 131 if p & 1: 132 result ^= q 133 q <<= 1 134 if q & top_bit: 135 q ^= CASTAGNOLI_POLY 136 p >>= 1 137 return result 138 139 140def _ExtendByZeros(crc, num_bits): 141 """Given crc representing polynomial P(x), compute P(x)*x^num_bits. 142 143 Args: 144 crc: crc respresenting polynomial P(x). 145 num_bits: number of bits in crc. 146 147 Returns: 148 P(x)*x^num_bits 149 """ 150 def _ReverseBits32(crc): 151 return int('{0:032b}'.format(crc, width=32)[::-1], 2) 152 crc = _ReverseBits32(crc) 153 i = 0 154 155 while num_bits != 0: 156 if num_bits & 1: 157 crc = _CrcMultiply(crc, X_POW_2K_TABLE[i % len(X_POW_2K_TABLE)]) 158 i += 1 159 num_bits >>= 1 160 crc = _ReverseBits32(crc) 161 return crc 162 163 164def _CalculateHashFromContents(fp, hash_alg): 165 """Calculates a base64 digest of the contents of a seekable stream. 166 167 This function resets the file pointer to position 0. 168 169 Args: 170 fp: An already-open file object. 171 hash_alg: Instance of hashing class initialized to start state. 172 173 Returns: 174 Hash of the stream in hex string format. 175 """ 176 hash_dict = {'placeholder': hash_alg} 177 fp.seek(0) 178 CalculateHashesFromContents(fp, hash_dict) 179 fp.seek(0) 180 return hash_dict['placeholder'].hexdigest() 181 182 183def CalculateHashesFromContents(fp, hash_dict, callback_processor=None): 184 """Calculates hashes of the contents of a file. 185 186 Args: 187 fp: An already-open file object (stream will be consumed). 188 hash_dict: Dict of (string alg_name: initialized hashing class) 189 Hashing class will be populated with digests upon return. 190 callback_processor: Optional callback processing class that implements 191 Progress(integer amount of bytes processed). 192 """ 193 while True: 194 data = fp.read(DEFAULT_FILE_BUFFER_SIZE) 195 if not data: 196 break 197 for hash_alg in hash_dict.itervalues(): 198 hash_alg.update(data) 199 if callback_processor: 200 callback_processor.Progress(len(data)) 201 202 203def CalculateB64EncodedCrc32cFromContents(fp): 204 """Calculates a base64 CRC32c checksum of the contents of a seekable stream. 205 206 This function sets the stream position 0 before and after calculation. 207 208 Args: 209 fp: An already-open file object. 210 211 Returns: 212 CRC32c checksum of the file in base64 format. 213 """ 214 return _CalculateB64EncodedHashFromContents( 215 fp, crcmod.predefined.Crc('crc-32c')) 216 217 218def CalculateB64EncodedMd5FromContents(fp): 219 """Calculates a base64 MD5 digest of the contents of a seekable stream. 220 221 This function sets the stream position 0 before and after calculation. 222 223 Args: 224 fp: An already-open file object. 225 226 Returns: 227 MD5 digest of the file in base64 format. 228 """ 229 return _CalculateB64EncodedHashFromContents(fp, md5()) 230 231 232def CalculateMd5FromContents(fp): 233 """Calculates a base64 MD5 digest of the contents of a seekable stream. 234 235 This function sets the stream position 0 before and after calculation. 236 237 Args: 238 fp: An already-open file object. 239 240 Returns: 241 MD5 digest of the file in hex format. 242 """ 243 return _CalculateHashFromContents(fp, md5()) 244 245 246def Base64EncodeHash(digest_value): 247 """Returns the base64-encoded version of the input hex digest value.""" 248 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n') 249 250 251def Base64ToHexHash(base64_hash): 252 """Returns the hex digest value of the input base64-encoded hash. 253 254 Args: 255 base64_hash: Base64-encoded hash, which may contain newlines and single or 256 double quotes. 257 258 Returns: 259 Hex digest of the input argument. 260 """ 261 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\''))) 262 263 264def _CalculateB64EncodedHashFromContents(fp, hash_alg): 265 """Calculates a base64 digest of the contents of a seekable stream. 266 267 This function sets the stream position 0 before and after calculation. 268 269 Args: 270 fp: An already-open file object. 271 hash_alg: Instance of hashing class initialized to start state. 272 273 Returns: 274 Hash of the stream in base64 format. 275 """ 276 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg)) 277 278 279def GetUploadHashAlgs(): 280 """Returns a dict of hash algorithms for validating an uploaded object. 281 282 This is for use only with single object uploads, not compose operations 283 such as those used by parallel composite uploads (though it can be used to 284 validate the individual components). 285 286 Returns: 287 dict of (algorithm_name: hash_algorithm) 288 """ 289 check_hashes_config = config.get( 290 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) 291 if check_hashes_config == 'never': 292 return {} 293 return {'md5': md5} 294 295 296def GetDownloadHashAlgs(logger, consider_md5=False, consider_crc32c=False): 297 """Returns a dict of hash algorithms for validating an object. 298 299 Args: 300 logger: logging.Logger for outputting log messages. 301 consider_md5: If True, consider using a md5 hash. 302 consider_crc32c: If True, consider using a crc32c hash. 303 304 Returns: 305 Dict of (string, hash algorithm). 306 307 Raises: 308 CommandException if hash algorithms satisfying the boto config file 309 cannot be returned. 310 """ 311 check_hashes_config = config.get( 312 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL) 313 if check_hashes_config == CHECK_HASH_NEVER: 314 return {} 315 316 hash_algs = {} 317 if consider_md5: 318 hash_algs['md5'] = md5 319 elif consider_crc32c: 320 # If the cloud provider supplies a CRC, we'll compute a checksum to 321 # validate if we're using a native crcmod installation and MD5 isn't 322 # offered as an alternative. 323 if UsingCrcmodExtension(crcmod): 324 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') 325 elif not hash_algs: 326 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL: 327 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT) 328 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP: 329 logger.warn(_NO_HASH_CHECK_WARNING) 330 elif check_hashes_config == CHECK_HASH_ALWAYS: 331 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING) 332 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c') 333 else: 334 raise CommandException( 335 'Your boto config \'check_hashes\' option is misconfigured.') 336 337 return hash_algs 338 339 340class HashingFileUploadWrapper(object): 341 """Wraps an input stream in a hash digester and exposes a stream interface. 342 343 This class provides integrity checking during file uploads via the 344 following properties: 345 346 Calls to read will appropriately update digesters with all bytes read. 347 Calls to seek (assuming it is supported by the wrapped stream) using 348 os.SEEK_SET will catch up / reset the digesters to the specified 349 position. If seek is called with a different os.SEEK mode, the caller 350 must return to the original position using os.SEEK_SET before further 351 reads. 352 Calls to seek are fast if the desired position is equal to the position at 353 the beginning of the last read call (we only need to re-hash bytes 354 from that point on). 355 """ 356 357 def __init__(self, stream, digesters, hash_algs, src_url, logger): 358 """Initializes the wrapper. 359 360 Args: 361 stream: Input stream. 362 digesters: dict of {string: hash digester} containing digesters, where 363 string is the name of the hash algorithm. 364 hash_algs: dict of {string: hash algorithm} for resetting and 365 recalculating digesters. String is the name of the hash algorithm. 366 src_url: Source FileUrl that is being copied. 367 logger: For outputting log messages. 368 """ 369 if not digesters: 370 raise CommandException('HashingFileUploadWrapper used with no digesters.') 371 elif not hash_algs: 372 raise CommandException('HashingFileUploadWrapper used with no hash_algs.') 373 374 self._orig_fp = stream 375 self._digesters = digesters 376 self._src_url = src_url 377 self._logger = logger 378 self._seek_away = None 379 380 self._digesters_previous = {} 381 for alg in self._digesters: 382 self._digesters_previous[alg] = self._digesters[alg].copy() 383 self._digesters_previous_mark = 0 384 self._digesters_current_mark = 0 385 self._hash_algs = hash_algs 386 387 def read(self, size=-1): # pylint: disable=invalid-name 388 """"Reads from the wrapped file pointer and calculates hash digests. 389 390 Args: 391 size: The amount of bytes to read. If ommited or negative, the entire 392 contents of the file will be read, hashed, and returned. 393 394 Returns: 395 Bytes from the wrapped stream. 396 397 Raises: 398 CommandException if the position of the wrapped stream is unknown. 399 """ 400 if self._seek_away is not None: 401 raise CommandException('Read called on hashing file pointer in an ' 402 'unknown position; cannot correctly compute ' 403 'digest.') 404 405 data = self._orig_fp.read(size) 406 self._digesters_previous_mark = self._digesters_current_mark 407 for alg in self._digesters: 408 self._digesters_previous[alg] = self._digesters[alg].copy() 409 self._digesters[alg].update(data) 410 self._digesters_current_mark += len(data) 411 return data 412 413 def tell(self): # pylint: disable=invalid-name 414 """Returns the current stream position.""" 415 return self._orig_fp.tell() 416 417 def seekable(self): # pylint: disable=invalid-name 418 """Returns true if the stream is seekable.""" 419 return self._orig_fp.seekable() 420 421 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name 422 """Seeks in the wrapped file pointer and catches up hash digests. 423 424 Args: 425 offset: The offset to seek to. 426 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET. 427 428 Returns: 429 Return value from the wrapped stream's seek call. 430 """ 431 if whence != os.SEEK_SET: 432 # We do not catch up hashes for non-absolute seeks, and rely on the 433 # caller to seek to an absolute position before reading. 434 self._seek_away = self._orig_fp.tell() 435 436 else: 437 # Hashes will be correct and it's safe to call read(). 438 self._seek_away = None 439 if offset < self._digesters_previous_mark: 440 # This is earlier than our earliest saved digest, so we need to 441 # reset the digesters and scan from the beginning. 442 for alg in self._digesters: 443 self._digesters[alg] = self._hash_algs[alg]() 444 self._digesters_current_mark = 0 445 self._orig_fp.seek(0) 446 self._CatchUp(offset) 447 448 elif offset == self._digesters_previous_mark: 449 # Just load the saved digests. 450 self._digesters_current_mark = self._digesters_previous_mark 451 for alg in self._digesters: 452 self._digesters[alg] = self._digesters_previous[alg] 453 454 elif offset < self._digesters_current_mark: 455 # Reset the position to our previous digest and scan forward. 456 self._digesters_current_mark = self._digesters_previous_mark 457 for alg in self._digesters: 458 self._digesters[alg] = self._digesters_previous[alg] 459 self._orig_fp.seek(self._digesters_previous_mark) 460 self._CatchUp(offset - self._digesters_previous_mark) 461 462 else: 463 # Scan forward from our current digest and position. 464 self._orig_fp.seek(self._digesters_current_mark) 465 self._CatchUp(offset - self._digesters_current_mark) 466 467 return self._orig_fp.seek(offset, whence) 468 469 def _CatchUp(self, bytes_to_read): 470 """Catches up hashes, but does not return data and uses little memory. 471 472 Before calling this function, digesters_current_mark should be updated 473 to the current location of the original stream and the self._digesters 474 should be current to that point (but no further). 475 476 Args: 477 bytes_to_read: Number of bytes to catch up from the original stream. 478 """ 479 if self._orig_fp.tell() != self._digesters_current_mark: 480 raise CommandException( 481 'Invalid mark when catching up hashes. Stream position %s, hash ' 482 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark)) 483 484 for alg in self._digesters: 485 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING: 486 self._logger.info('Catching up %s for %s...', alg, 487 self._src_url.url_string) 488 self._digesters_previous[alg] = self._digesters[alg].copy() 489 490 self._digesters_previous_mark = self._digesters_current_mark 491 bytes_remaining = bytes_to_read 492 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) 493 while bytes_this_round: 494 data = self._orig_fp.read(bytes_this_round) 495 bytes_remaining -= bytes_this_round 496 for alg in self._digesters: 497 self._digesters[alg].update(data) 498 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE) 499 self._digesters_current_mark += bytes_to_read 500