1# -*- coding: utf-8 -*-
2# Copyright 2014 Google Inc. All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Helper functions for hashing functionality."""
16
17import base64
18import binascii
19from hashlib import md5
20import os
21
22from boto import config
23import crcmod
24
25from gslib.exception import CommandException
26from gslib.util import DEFAULT_FILE_BUFFER_SIZE
27from gslib.util import MIN_SIZE_COMPUTE_LOGGING
28from gslib.util import TRANSFER_BUFFER_SIZE
29from gslib.util import UsingCrcmodExtension
30
31
32SLOW_CRCMOD_WARNING = """
33WARNING: You have requested checksumming but your crcmod installation isn't
34using the module's C extension, so checksumming will run very slowly. For help
35installing the extension, please see:
36  $ gsutil help crcmod
37"""
38
39
40_SLOW_CRCMOD_DOWNLOAD_WARNING = """
41WARNING: Downloading this composite object requires integrity checking with
42CRC32c, but your crcmod installation isn't using the module's C extension,
43so the hash computation will likely throttle download performance. For help
44installing the extension, please see:
45  $ gsutil help crcmod
46To disable slow integrity checking, see the "check_hashes" option in your
47boto config file.
48"""
49
50_SLOW_CRC_EXCEPTION_TEXT = """
51Downloading this composite object requires integrity checking with CRC32c,
52but your crcmod installation isn't using the module's C extension, so the
53hash computation will likely throttle download performance. For help
54installing the extension, please see:
55
56  $ gsutil help crcmod
57
58To download regardless of crcmod performance or to skip slow integrity
59checks, see the "check_hashes" option in your boto config file.
60
61NOTE: It is strongly recommended that you not disable integrity checks. Doing so
62could allow data corruption to go undetected during uploading/downloading."""
63
64
65_NO_HASH_CHECK_WARNING = """
66WARNING: This download will not be validated since your crcmod installation
67doesn't use the module's C extension, so the hash computation would likely
68throttle download performance. For help in installing the extension, please
69see:
70  $ gsutil help crcmod
71To force integrity checking, see the "check_hashes" option in your boto config
72file.
73"""
74
75
76# Configuration values for hashing.
77CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail'
78CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip'
79CHECK_HASH_ALWAYS = 'always'
80CHECK_HASH_NEVER = 'never'
81
82# Table storing polynomial values of x^(2^k) mod CASTAGNOLI_POLY for all k < 31,
83# where x^(2^k) and CASTAGNOLI_POLY are both considered polynomials. This is
84# sufficient since x^(2^31) mod CASTAGNOLI_POLY = x.
85X_POW_2K_TABLE = [2, 4, 16, 256, 65536, 517762881, 984302966,
86                  408362264, 1503875210, 2862076957, 3884826397, 1324787473,
87                  621200174, 1758783527, 1416537776, 1180494764, 648569364,
88                  2521473789, 994858823, 1728245375, 3498467999, 4059169852,
89                  3345064394, 2828422810, 2429203150, 3336788029, 860151998,
90                  2102628683, 1033187991, 4243778976, 1123580069]
91# Castagnoli polynomial and its degree.
92CASTAGNOLI_POLY = 4812730177
93DEGREE = 32
94
95
96def ConcatCrc32c(crc_a, crc_b, num_bytes_in_b):
97  """Computes CRC32C for concat(A, B) given crc(A), crc(B) and len(B).
98
99  An explanation of the algorithm can be found at
100  crcutil.googlecode.com/files/crc-doc.1.0.pdf.
101
102  Args:
103    crc_a: A 32-bit integer representing crc(A) with least-significant
104           coefficient first.
105    crc_b: Same as crc_a.
106    num_bytes_in_b: Length of B in bytes.
107
108  Returns:
109    CRC32C for concat(A, B)
110  """
111  if not num_bytes_in_b:
112    return crc_a
113
114  return _ExtendByZeros(crc_a, 8 * num_bytes_in_b) ^ crc_b
115
116
117def _CrcMultiply(p, q):
118  """Multiplies two polynomials together modulo CASTAGNOLI_POLY.
119
120  Args:
121    p: The first polynomial.
122    q: The second polynomial.
123
124  Returns:
125    Result of the multiplication.
126  """
127
128  result = 0
129  top_bit = 1 << DEGREE
130  for _ in range(DEGREE):
131    if p & 1:
132      result ^= q
133    q <<= 1
134    if q & top_bit:
135      q ^= CASTAGNOLI_POLY
136    p >>= 1
137  return result
138
139
140def _ExtendByZeros(crc, num_bits):
141  """Given crc representing polynomial P(x), compute P(x)*x^num_bits.
142
143  Args:
144    crc: crc respresenting polynomial P(x).
145    num_bits: number of bits in crc.
146
147  Returns:
148    P(x)*x^num_bits
149  """
150  def _ReverseBits32(crc):
151    return int('{0:032b}'.format(crc, width=32)[::-1], 2)
152  crc = _ReverseBits32(crc)
153  i = 0
154
155  while num_bits != 0:
156    if num_bits & 1:
157      crc = _CrcMultiply(crc, X_POW_2K_TABLE[i % len(X_POW_2K_TABLE)])
158    i += 1
159    num_bits >>= 1
160  crc = _ReverseBits32(crc)
161  return crc
162
163
164def _CalculateHashFromContents(fp, hash_alg):
165  """Calculates a base64 digest of the contents of a seekable stream.
166
167  This function resets the file pointer to position 0.
168
169  Args:
170    fp: An already-open file object.
171    hash_alg: Instance of hashing class initialized to start state.
172
173  Returns:
174    Hash of the stream in hex string format.
175  """
176  hash_dict = {'placeholder': hash_alg}
177  fp.seek(0)
178  CalculateHashesFromContents(fp, hash_dict)
179  fp.seek(0)
180  return hash_dict['placeholder'].hexdigest()
181
182
183def CalculateHashesFromContents(fp, hash_dict, callback_processor=None):
184  """Calculates hashes of the contents of a file.
185
186  Args:
187    fp: An already-open file object (stream will be consumed).
188    hash_dict: Dict of (string alg_name: initialized hashing class)
189        Hashing class will be populated with digests upon return.
190    callback_processor: Optional callback processing class that implements
191        Progress(integer amount of bytes processed).
192  """
193  while True:
194    data = fp.read(DEFAULT_FILE_BUFFER_SIZE)
195    if not data:
196      break
197    for hash_alg in hash_dict.itervalues():
198      hash_alg.update(data)
199    if callback_processor:
200      callback_processor.Progress(len(data))
201
202
203def CalculateB64EncodedCrc32cFromContents(fp):
204  """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
205
206  This function sets the stream position 0 before and after calculation.
207
208  Args:
209    fp: An already-open file object.
210
211  Returns:
212    CRC32c checksum of the file in base64 format.
213  """
214  return _CalculateB64EncodedHashFromContents(
215      fp, crcmod.predefined.Crc('crc-32c'))
216
217
218def CalculateB64EncodedMd5FromContents(fp):
219  """Calculates a base64 MD5 digest of the contents of a seekable stream.
220
221  This function sets the stream position 0 before and after calculation.
222
223  Args:
224    fp: An already-open file object.
225
226  Returns:
227    MD5 digest of the file in base64 format.
228  """
229  return _CalculateB64EncodedHashFromContents(fp, md5())
230
231
232def CalculateMd5FromContents(fp):
233  """Calculates a base64 MD5 digest of the contents of a seekable stream.
234
235  This function sets the stream position 0 before and after calculation.
236
237  Args:
238    fp: An already-open file object.
239
240  Returns:
241    MD5 digest of the file in hex format.
242  """
243  return _CalculateHashFromContents(fp, md5())
244
245
246def Base64EncodeHash(digest_value):
247  """Returns the base64-encoded version of the input hex digest value."""
248  return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n')
249
250
251def Base64ToHexHash(base64_hash):
252  """Returns the hex digest value of the input base64-encoded hash.
253
254  Args:
255    base64_hash: Base64-encoded hash, which may contain newlines and single or
256        double quotes.
257
258  Returns:
259    Hex digest of the input argument.
260  """
261  return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\'')))
262
263
264def _CalculateB64EncodedHashFromContents(fp, hash_alg):
265  """Calculates a base64 digest of the contents of a seekable stream.
266
267  This function sets the stream position 0 before and after calculation.
268
269  Args:
270    fp: An already-open file object.
271    hash_alg: Instance of hashing class initialized to start state.
272
273  Returns:
274    Hash of the stream in base64 format.
275  """
276  return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg))
277
278
279def GetUploadHashAlgs():
280  """Returns a dict of hash algorithms for validating an uploaded object.
281
282  This is for use only with single object uploads, not compose operations
283  such as those used by parallel composite uploads (though it can be used to
284  validate the individual components).
285
286  Returns:
287    dict of (algorithm_name: hash_algorithm)
288  """
289  check_hashes_config = config.get(
290      'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
291  if check_hashes_config == 'never':
292    return {}
293  return {'md5': md5}
294
295
296def GetDownloadHashAlgs(logger, consider_md5=False, consider_crc32c=False):
297  """Returns a dict of hash algorithms for validating an object.
298
299  Args:
300    logger: logging.Logger for outputting log messages.
301    consider_md5: If True, consider using a md5 hash.
302    consider_crc32c: If True, consider using a crc32c hash.
303
304  Returns:
305    Dict of (string, hash algorithm).
306
307  Raises:
308    CommandException if hash algorithms satisfying the boto config file
309    cannot be returned.
310  """
311  check_hashes_config = config.get(
312      'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
313  if check_hashes_config == CHECK_HASH_NEVER:
314    return {}
315
316  hash_algs = {}
317  if consider_md5:
318    hash_algs['md5'] = md5
319  elif consider_crc32c:
320    # If the cloud provider supplies a CRC, we'll compute a checksum to
321    # validate if we're using a native crcmod installation and MD5 isn't
322    # offered as an alternative.
323    if UsingCrcmodExtension(crcmod):
324      hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
325    elif not hash_algs:
326      if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL:
327        raise CommandException(_SLOW_CRC_EXCEPTION_TEXT)
328      elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP:
329        logger.warn(_NO_HASH_CHECK_WARNING)
330      elif check_hashes_config == CHECK_HASH_ALWAYS:
331        logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING)
332        hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
333      else:
334        raise CommandException(
335            'Your boto config \'check_hashes\' option is misconfigured.')
336
337  return hash_algs
338
339
340class HashingFileUploadWrapper(object):
341  """Wraps an input stream in a hash digester and exposes a stream interface.
342
343  This class provides integrity checking during file uploads via the
344  following properties:
345
346  Calls to read will appropriately update digesters with all bytes read.
347  Calls to seek (assuming it is supported by the wrapped stream) using
348      os.SEEK_SET will catch up / reset the digesters to the specified
349      position. If seek is called with a different os.SEEK mode, the caller
350      must return to the original position using os.SEEK_SET before further
351      reads.
352  Calls to seek are fast if the desired position is equal to the position at
353      the beginning of the last read call (we only need to re-hash bytes
354      from that point on).
355  """
356
357  def __init__(self, stream, digesters, hash_algs, src_url, logger):
358    """Initializes the wrapper.
359
360    Args:
361      stream: Input stream.
362      digesters: dict of {string: hash digester} containing digesters, where
363          string is the name of the hash algorithm.
364      hash_algs: dict of {string: hash algorithm} for resetting and
365          recalculating digesters. String is the name of the hash algorithm.
366      src_url: Source FileUrl that is being copied.
367      logger: For outputting log messages.
368    """
369    if not digesters:
370      raise CommandException('HashingFileUploadWrapper used with no digesters.')
371    elif not hash_algs:
372      raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
373
374    self._orig_fp = stream
375    self._digesters = digesters
376    self._src_url = src_url
377    self._logger = logger
378    self._seek_away = None
379
380    self._digesters_previous = {}
381    for alg in self._digesters:
382      self._digesters_previous[alg] = self._digesters[alg].copy()
383    self._digesters_previous_mark = 0
384    self._digesters_current_mark = 0
385    self._hash_algs = hash_algs
386
387  def read(self, size=-1):  # pylint: disable=invalid-name
388    """"Reads from the wrapped file pointer and calculates hash digests.
389
390    Args:
391      size: The amount of bytes to read. If ommited or negative, the entire
392          contents of the file will be read, hashed, and returned.
393
394    Returns:
395      Bytes from the wrapped stream.
396
397    Raises:
398      CommandException if the position of the wrapped stream is unknown.
399    """
400    if self._seek_away is not None:
401      raise CommandException('Read called on hashing file pointer in an '
402                             'unknown position; cannot correctly compute '
403                             'digest.')
404
405    data = self._orig_fp.read(size)
406    self._digesters_previous_mark = self._digesters_current_mark
407    for alg in self._digesters:
408      self._digesters_previous[alg] = self._digesters[alg].copy()
409      self._digesters[alg].update(data)
410    self._digesters_current_mark += len(data)
411    return data
412
413  def tell(self):  # pylint: disable=invalid-name
414    """Returns the current stream position."""
415    return self._orig_fp.tell()
416
417  def seekable(self):  # pylint: disable=invalid-name
418    """Returns true if the stream is seekable."""
419    return self._orig_fp.seekable()
420
421  def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
422    """Seeks in the wrapped file pointer and catches up hash digests.
423
424    Args:
425      offset: The offset to seek to.
426      whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
427
428    Returns:
429      Return value from the wrapped stream's seek call.
430    """
431    if whence != os.SEEK_SET:
432      # We do not catch up hashes for non-absolute seeks, and rely on the
433      # caller to seek to an absolute position before reading.
434      self._seek_away = self._orig_fp.tell()
435
436    else:
437      # Hashes will be correct and it's safe to call read().
438      self._seek_away = None
439      if offset < self._digesters_previous_mark:
440        # This is earlier than our earliest saved digest, so we need to
441        # reset the digesters and scan from the beginning.
442        for alg in self._digesters:
443          self._digesters[alg] = self._hash_algs[alg]()
444        self._digesters_current_mark = 0
445        self._orig_fp.seek(0)
446        self._CatchUp(offset)
447
448      elif offset == self._digesters_previous_mark:
449        # Just load the saved digests.
450        self._digesters_current_mark = self._digesters_previous_mark
451        for alg in self._digesters:
452          self._digesters[alg] = self._digesters_previous[alg]
453
454      elif offset < self._digesters_current_mark:
455        # Reset the position to our previous digest and scan forward.
456        self._digesters_current_mark = self._digesters_previous_mark
457        for alg in self._digesters:
458          self._digesters[alg] = self._digesters_previous[alg]
459        self._orig_fp.seek(self._digesters_previous_mark)
460        self._CatchUp(offset - self._digesters_previous_mark)
461
462      else:
463        # Scan forward from our current digest and position.
464        self._orig_fp.seek(self._digesters_current_mark)
465        self._CatchUp(offset - self._digesters_current_mark)
466
467    return self._orig_fp.seek(offset, whence)
468
469  def _CatchUp(self, bytes_to_read):
470    """Catches up hashes, but does not return data and uses little memory.
471
472    Before calling this function, digesters_current_mark should be updated
473    to the current location of the original stream and the self._digesters
474    should be current to that point (but no further).
475
476    Args:
477      bytes_to_read: Number of bytes to catch up from the original stream.
478    """
479    if self._orig_fp.tell() != self._digesters_current_mark:
480      raise CommandException(
481          'Invalid mark when catching up hashes. Stream position %s, hash '
482          'position %s' % (self._orig_fp.tell(), self._digesters_current_mark))
483
484    for alg in self._digesters:
485      if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING:
486        self._logger.info('Catching up %s for %s...', alg,
487                          self._src_url.url_string)
488      self._digesters_previous[alg] = self._digesters[alg].copy()
489
490    self._digesters_previous_mark = self._digesters_current_mark
491    bytes_remaining = bytes_to_read
492    bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
493    while bytes_this_round:
494      data = self._orig_fp.read(bytes_this_round)
495      bytes_remaining -= bytes_this_round
496      for alg in self._digesters:
497        self._digesters[alg].update(data)
498      bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
499    self._digesters_current_mark += bytes_to_read
500