1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Wrappers for gsutil, for basic interaction with Google Cloud Storage."""
6
7import collections
8import contextlib
9import hashlib
10import logging
11import os
12import re
13import shutil
14import stat
15import subprocess
16import sys
17import tempfile
18import time
19
20import py_utils
21from py_utils import cloud_storage_global_lock  # pylint: disable=unused-import
22from py_utils import lock
23
24# Do a no-op import here so that cloud_storage_global_lock dep is picked up
25# by https://cs.chromium.org/chromium/src/build/android/test_runner.pydeps.
26# TODO(nedn, jbudorick): figure out a way to get rid of this ugly hack.
27
28logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
29
30
31PUBLIC_BUCKET = 'chromium-telemetry'
32PARTNER_BUCKET = 'chrome-partner-telemetry'
33INTERNAL_BUCKET = 'chrome-telemetry'
34TELEMETRY_OUTPUT = 'chrome-telemetry-output'
35
36# Uses ordered dict to make sure that bucket's key-value items are ordered from
37# the most open to the most restrictive.
38BUCKET_ALIASES = collections.OrderedDict((
39    ('public', PUBLIC_BUCKET),
40    ('partner', PARTNER_BUCKET),
41    ('internal', INTERNAL_BUCKET),
42    ('output', TELEMETRY_OUTPUT),
43))
44
45BUCKET_ALIAS_NAMES = list(BUCKET_ALIASES.keys())
46
47
48_GSUTIL_PATH = os.path.join(py_utils.GetCatapultDir(), 'third_party', 'gsutil',
49                            'gsutil')
50
51# TODO(tbarzic): A workaround for http://crbug.com/386416 and
52#     http://crbug.com/359293. See |_RunCommand|.
53_CROS_GSUTIL_HOME_WAR = '/home/chromeos-test/'
54
55
56# If Environment variables has DISABLE_CLOUD_STORAGE_IO set to '1', any method
57# calls that invoke cloud storage network io will throw exceptions.
58DISABLE_CLOUD_STORAGE_IO = 'DISABLE_CLOUD_STORAGE_IO'
59
60# The maximum number of seconds to wait to acquire the pseudo lock for a cloud
61# storage file before raising an exception.
62LOCK_ACQUISITION_TIMEOUT = 10
63
64
65class CloudStorageError(Exception):
66
67  @staticmethod
68  def _GetConfigInstructions():
69    command = _GSUTIL_PATH
70    if py_utils.IsRunningOnCrosDevice():
71      command = 'HOME=%s %s' % (_CROS_GSUTIL_HOME_WAR, _GSUTIL_PATH)
72    return ('To configure your credentials:\n'
73            '  1. Run "%s config" and follow its instructions.\n'
74            '  2. If you have a @google.com account, use that account.\n'
75            '  3. For the project-id, just enter 0.' % command)
76
77
78class PermissionError(CloudStorageError):
79
80  def __init__(self):
81    super(PermissionError, self).__init__(
82        'Attempted to access a file from Cloud Storage but you don\'t '
83        'have permission. ' + self._GetConfigInstructions())
84
85
86class CredentialsError(CloudStorageError):
87
88  def __init__(self):
89    super(CredentialsError, self).__init__(
90        'Attempted to access a file from Cloud Storage but you have no '
91        'configured credentials. ' + self._GetConfigInstructions())
92
93
94class CloudStorageIODisabled(CloudStorageError):
95  pass
96
97
98class NotFoundError(CloudStorageError):
99  pass
100
101
102class ServerError(CloudStorageError):
103  pass
104
105
106# TODO(tonyg/dtu): Can this be replaced with distutils.spawn.find_executable()?
107def _FindExecutableInPath(relative_executable_path, *extra_search_paths):
108  search_paths = list(extra_search_paths) + os.environ['PATH'].split(os.pathsep)
109  for search_path in search_paths:
110    executable_path = os.path.join(search_path, relative_executable_path)
111    if py_utils.IsExecutable(executable_path):
112      return executable_path
113  return None
114
115
116def _EnsureExecutable(gsutil):
117  """chmod +x if gsutil is not executable."""
118  st = os.stat(gsutil)
119  if not st.st_mode & stat.S_IEXEC:
120    os.chmod(gsutil, st.st_mode | stat.S_IEXEC)
121
122
123def _IsRunningOnSwarming():
124  return os.environ.get('SWARMING_HEADLESS') is not None
125
126def _RunCommand(args):
127  # On cros device, as telemetry is running as root, home will be set to /root/,
128  # which is not writable. gsutil will attempt to create a download tracker dir
129  # in home dir and fail. To avoid this, override HOME dir to something writable
130  # when running on cros device.
131  #
132  # TODO(tbarzic): Figure out a better way to handle gsutil on cros.
133  #     http://crbug.com/386416, http://crbug.com/359293.
134  gsutil_env = None
135  if py_utils.IsRunningOnCrosDevice():
136    gsutil_env = os.environ.copy()
137    gsutil_env['HOME'] = _CROS_GSUTIL_HOME_WAR
138  elif _IsRunningOnSwarming():
139    gsutil_env = os.environ.copy()
140
141  if os.name == 'nt':
142    # If Windows, prepend python. Python scripts aren't directly executable.
143    args = [sys.executable, _GSUTIL_PATH] + args
144  else:
145    # Don't do it on POSIX, in case someone is using a shell script to redirect.
146    args = [_GSUTIL_PATH] + args
147    _EnsureExecutable(_GSUTIL_PATH)
148
149  if args[0] not in ('help', 'hash', 'version') and not IsNetworkIOEnabled():
150    raise CloudStorageIODisabled(
151        "Environment variable DISABLE_CLOUD_STORAGE_IO is set to 1. "
152        'Command %s is not allowed to run' % args)
153
154  gsutil = subprocess.Popen(args, stdout=subprocess.PIPE,
155                            stderr=subprocess.PIPE, env=gsutil_env)
156  stdout, stderr = gsutil.communicate()
157
158  if gsutil.returncode:
159    raise GetErrorObjectForCloudStorageStderr(stderr)
160
161  return stdout
162
163
164def GetErrorObjectForCloudStorageStderr(stderr):
165  if (stderr.startswith((
166      'You are attempting to access protected data with no configured',
167      'Failure: No handler was ready to authenticate.')) or
168      re.match('.*401.*does not have .* access to .*', stderr)):
169    return CredentialsError()
170  if ('status=403' in stderr or 'status 403' in stderr or
171      '403 Forbidden' in stderr or
172      re.match('.*403.*does not have .* access to .*', stderr)):
173    return PermissionError()
174  if (stderr.startswith('InvalidUriError') or 'No such object' in stderr or
175      'No URLs matched' in stderr or 'One or more URLs matched no' in stderr):
176    return NotFoundError(stderr)
177  if '500 Internal Server Error' in stderr:
178    return ServerError(stderr)
179  return CloudStorageError(stderr)
180
181
182def IsNetworkIOEnabled():
183  """Returns true if cloud storage is enabled."""
184  disable_cloud_storage_env_val = os.getenv(DISABLE_CLOUD_STORAGE_IO)
185
186  if disable_cloud_storage_env_val and disable_cloud_storage_env_val != '1':
187    logger.error(
188        'Unsupported value of environment variable '
189        'DISABLE_CLOUD_STORAGE_IO. Expected None or \'1\' but got %s.',
190        disable_cloud_storage_env_val)
191
192  return disable_cloud_storage_env_val != '1'
193
194
195def List(bucket, prefix=None):
196  """Returns all paths matching the given prefix in bucket.
197
198  Returned paths are relative to the bucket root.
199  If path is given, 'gsutil ls gs://<bucket>/<path>' will be executed, otherwise
200  'gsutil ls gs://<bucket>' will be executed.
201
202  For more details, see:
203  https://cloud.google.com/storage/docs/gsutil/commands/ls#directory-by-directory,-flat,-and-recursive-listings
204
205  Args:
206    bucket: Name of cloud storage bucket to look at.
207    prefix: Path within the bucket to filter to.
208
209  Returns:
210    A list of files. All returned path are relative to the bucket root
211    directory. For example, List('my-bucket', path='foo/') will returns results
212    of the form ['/foo/123', '/foo/124', ...], as opposed to ['123', '124',
213    ...].
214  """
215  bucket_prefix = 'gs://%s' % bucket
216  if prefix is None:
217    full_path = bucket_prefix
218  else:
219    full_path = '%s/%s' % (bucket_prefix, prefix)
220  stdout = _RunCommand(['ls', full_path])
221  return [url[len(bucket_prefix):] for url in stdout.splitlines()]
222
223
224def ListDirs(bucket, path=''):
225  """Returns only directories matching the given path in bucket.
226
227  Args:
228    bucket: Name of cloud storage bucket to look at.
229    path: Path within the bucket to filter to. Path can include wildcards.
230      path = 'foo*' will return ['mybucket/foo1/', 'mybucket/foo2/, ... ] but
231      not mybucket/foo1/file.txt or mybucket/foo-file.txt.
232
233  Returns:
234    A list of directories. All returned path are relative to the bucket root
235    directory. For example, List('my-bucket', path='foo/') will returns results
236    of the form ['/foo/123', '/foo/124', ...], as opposed to ['123', '124',
237    ...].
238  """
239  bucket_prefix = 'gs://%s' % bucket
240  full_path = '%s/%s' % (bucket_prefix, path)
241  # Note that -d only ensures we don't recurse into subdirectories
242  # unnecessarily. It still lists all non directory files matching the path
243  # following by a blank line. Adding -d here is a performance optimization.
244  stdout = _RunCommand(['ls', '-d', full_path])
245  dirs = []
246  for url in stdout.splitlines():
247    if len(url) == 0:
248      continue
249    # The only way to identify directories is by filtering for trailing slash.
250    # See https://github.com/GoogleCloudPlatform/gsutil/issues/466
251    if url[-1] == '/':
252      dirs.append(url[len(bucket_prefix):])
253  return dirs
254
255def Exists(bucket, remote_path):
256  try:
257    _RunCommand(['ls', 'gs://%s/%s' % (bucket, remote_path)])
258    return True
259  except NotFoundError:
260    return False
261
262
263def Move(bucket1, bucket2, remote_path):
264  url1 = 'gs://%s/%s' % (bucket1, remote_path)
265  url2 = 'gs://%s/%s' % (bucket2, remote_path)
266  logger.info('Moving %s to %s', url1, url2)
267  _RunCommand(['mv', url1, url2])
268
269
270def Copy(bucket_from, bucket_to, remote_path_from, remote_path_to):
271  """Copy a file from one location in CloudStorage to another.
272
273  Args:
274      bucket_from: The cloud storage bucket where the file is currently located.
275      bucket_to: The cloud storage bucket it is being copied to.
276      remote_path_from: The file path where the file is located in bucket_from.
277      remote_path_to: The file path it is being copied to in bucket_to.
278
279  It should: cause no changes locally or to the starting file, and will
280  overwrite any existing files in the destination location.
281  """
282  url1 = 'gs://%s/%s' % (bucket_from, remote_path_from)
283  url2 = 'gs://%s/%s' % (bucket_to, remote_path_to)
284  logger.info('Copying %s to %s', url1, url2)
285  _RunCommand(['cp', url1, url2])
286
287
288def Delete(bucket, remote_path):
289  url = 'gs://%s/%s' % (bucket, remote_path)
290  logger.info('Deleting %s', url)
291  _RunCommand(['rm', url])
292
293
294def Get(bucket, remote_path, local_path):
295  with _FileLock(local_path):
296    _GetLocked(bucket, remote_path, local_path)
297
298
299_CLOUD_STORAGE_GLOBAL_LOCK = os.path.join(
300    os.path.dirname(os.path.abspath(__file__)), 'cloud_storage_global_lock.py')
301
302
303@contextlib.contextmanager
304def _FileLock(base_path):
305  pseudo_lock_path = '%s.pseudo_lock' % base_path
306  _CreateDirectoryIfNecessary(os.path.dirname(pseudo_lock_path))
307
308  # Make sure that we guard the creation, acquisition, release, and removal of
309  # the pseudo lock all with the same guard (_CLOUD_STORAGE_GLOBAL_LOCK).
310  # Otherwise, we can get nasty interleavings that result in multiple processes
311  # thinking they have an exclusive lock, like:
312  #
313  # (Process 1) Create and acquire the pseudo lock
314  # (Process 1) Release the pseudo lock
315  # (Process 1) Release the file lock
316  # (Process 2) Open and acquire the existing pseudo lock
317  # (Process 1) Delete the (existing) pseudo lock
318  # (Process 3) Create and acquire a new pseudo lock
319  #
320  # Using the same guard for creation and removal of the pseudo lock guarantees
321  # that all processes are referring to the same lock.
322  pseudo_lock_fd = None
323  pseudo_lock_fd_return = []
324  py_utils.WaitFor(lambda: _AttemptPseudoLockAcquisition(pseudo_lock_path,
325                                                         pseudo_lock_fd_return),
326                   LOCK_ACQUISITION_TIMEOUT)
327  pseudo_lock_fd = pseudo_lock_fd_return[0]
328
329  try:
330    yield
331  finally:
332    py_utils.WaitFor(lambda: _AttemptPseudoLockRelease(pseudo_lock_fd),
333                     LOCK_ACQUISITION_TIMEOUT)
334
335def _AttemptPseudoLockAcquisition(pseudo_lock_path, pseudo_lock_fd_return):
336  """Try to acquire the lock and return a boolean indicating whether the attempt
337  was successful. If the attempt was successful, pseudo_lock_fd_return, which
338  should be an empty array, will be modified to contain a single entry: the file
339  descriptor of the (now acquired) lock file.
340
341  This whole operation is guarded with the global cloud storage lock, which
342  prevents race conditions that might otherwise cause multiple processes to
343  believe they hold the same pseudo lock (see _FileLock for more details).
344  """
345  pseudo_lock_fd = None
346  try:
347    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
348      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
349        # Attempt to acquire the lock in a non-blocking manner. If we block,
350        # then we'll cause deadlock because another process will be unable to
351        # acquire the cloud storage global lock in order to release the pseudo
352        # lock.
353        pseudo_lock_fd = open(pseudo_lock_path, 'w')
354        lock.AcquireFileLock(pseudo_lock_fd, lock.LOCK_EX | lock.LOCK_NB)
355        pseudo_lock_fd_return.append(pseudo_lock_fd)
356        return True
357  except (lock.LockException, IOError):
358    # We failed to acquire either the global cloud storage lock or the pseudo
359    # lock.
360    if pseudo_lock_fd:
361      pseudo_lock_fd.close()
362    return False
363
364
365def _AttemptPseudoLockRelease(pseudo_lock_fd):
366  """Try to release the pseudo lock and return a boolean indicating whether
367  the release was succesful.
368
369  This whole operation is guarded with the global cloud storage lock, which
370  prevents race conditions that might otherwise cause multiple processes to
371  believe they hold the same pseudo lock (see _FileLock for more details).
372  """
373  pseudo_lock_path = pseudo_lock_fd.name
374  try:
375    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
376      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
377        lock.ReleaseFileLock(pseudo_lock_fd)
378        pseudo_lock_fd.close()
379        try:
380          os.remove(pseudo_lock_path)
381        except OSError:
382          # We don't care if the pseudo lock gets removed elsewhere before
383          # we have a chance to do so.
384          pass
385        return True
386  except (lock.LockException, IOError):
387    # We failed to acquire the global cloud storage lock and are thus unable to
388    # release the pseudo lock.
389    return False
390
391
392def _CreateDirectoryIfNecessary(directory):
393  if not os.path.exists(directory):
394    os.makedirs(directory)
395
396
397def _GetLocked(bucket, remote_path, local_path):
398  url = 'gs://%s/%s' % (bucket, remote_path)
399  logger.info('Downloading %s to %s', url, local_path)
400  _CreateDirectoryIfNecessary(os.path.dirname(local_path))
401  with tempfile.NamedTemporaryFile(
402      dir=os.path.dirname(local_path),
403      delete=False) as partial_download_path:
404    try:
405      # Windows won't download to an open file.
406      partial_download_path.close()
407      try:
408        _RunCommand(['cp', url, partial_download_path.name])
409      except ServerError:
410        logger.info('Cloud Storage server error, retrying download')
411        _RunCommand(['cp', url, partial_download_path.name])
412      shutil.move(partial_download_path.name, local_path)
413    finally:
414      if os.path.exists(partial_download_path.name):
415        os.remove(partial_download_path.name)
416
417
418def Insert(bucket, remote_path, local_path, publicly_readable=False):
419  """Upload file in |local_path| to cloud storage.
420
421  Args:
422    bucket: the google cloud storage bucket name.
423    remote_path: the remote file path in |bucket|.
424    local_path: path of the local file to be uploaded.
425    publicly_readable: whether the uploaded file has publicly readable
426    permission.
427
428  Returns:
429    The url where the file is uploaded to.
430  """
431  cloud_filepath = Upload(bucket, remote_path, local_path, publicly_readable)
432  return cloud_filepath.view_url
433
434
435class CloudFilepath(object):
436  def __init__(self, bucket, remote_path):
437    self.bucket = bucket
438    self.remote_path = remote_path
439
440  @property
441  def view_url(self):
442    """Get a human viewable url for the cloud file."""
443    return 'https://console.developers.google.com/m/cloudstorage/b/%s/o/%s' % (
444        self.bucket, self.remote_path)
445
446  @property
447  def fetch_url(self):
448    """Get a machine fetchable url for the cloud file."""
449    return 'gs://%s/%s' % (self.bucket, self.remote_path)
450
451
452def Upload(bucket, remote_path, local_path, publicly_readable=False):
453  """Upload file in |local_path| to cloud storage.
454
455  Newer version of 'Insert()' returns an object instead of a string.
456
457  Args:
458    bucket: the google cloud storage bucket name.
459    remote_path: the remote file path in |bucket|.
460    local_path: path of the local file to be uploaded.
461    publicly_readable: whether the uploaded file has publicly readable
462    permission.
463
464  Returns:
465    A CloudFilepath object providing the location of the object in various
466    formats.
467  """
468  url = 'gs://%s/%s' % (bucket, remote_path)
469  command_and_args = ['cp']
470  extra_info = ''
471  if publicly_readable:
472    command_and_args += ['-a', 'public-read']
473    extra_info = ' (publicly readable)'
474  command_and_args += [local_path, url]
475  logger.info('Uploading %s to %s%s', local_path, url, extra_info)
476  _RunCommand(command_and_args)
477  return CloudFilepath(bucket, remote_path)
478
479
480def GetIfHashChanged(cs_path, download_path, bucket, file_hash):
481  """Downloads |download_path| to |file_path| if |file_path| doesn't exist or
482     it's hash doesn't match |file_hash|.
483
484  Returns:
485    True if the binary was changed.
486  Raises:
487    CredentialsError if the user has no configured credentials.
488    PermissionError if the user does not have permission to access the bucket.
489    NotFoundError if the file is not in the given bucket in cloud_storage.
490  """
491  with _FileLock(download_path):
492    if (os.path.exists(download_path) and
493        CalculateHash(download_path) == file_hash):
494      return False
495    _GetLocked(bucket, cs_path, download_path)
496    return True
497
498
499def GetIfChanged(file_path, bucket):
500  """Gets the file at file_path if it has a hash file that doesn't match or
501  if there is no local copy of file_path, but there is a hash file for it.
502
503  Returns:
504    True if the binary was changed.
505  Raises:
506    CredentialsError if the user has no configured credentials.
507    PermissionError if the user does not have permission to access the bucket.
508    NotFoundError if the file is not in the given bucket in cloud_storage.
509  """
510  with _FileLock(file_path):
511    hash_path = file_path + '.sha1'
512    fetch_ts_path = file_path + '.fetchts'
513    if not os.path.exists(hash_path):
514      logger.warning('Hash file not found: %s', hash_path)
515      return False
516
517    expected_hash = ReadHash(hash_path)
518
519    # To save the time required computing binary hash (which is an expensive
520    # operation, see crbug.com/793609#c2 for details), any time we fetch a new
521    # binary, we save not only that binary but the time of the fetch in
522    # |fetch_ts_path|. Anytime the file needs updated (its
523    # hash in |hash_path| change), we can just need to compare the timestamp of
524    # |hash_path| with the timestamp in |fetch_ts_path| to figure out
525    # if the update operation has been done.
526    #
527    # Notes: for this to work, we make the assumption that only
528    # cloud_storage.GetIfChanged modifies the local |file_path| binary.
529
530    if os.path.exists(fetch_ts_path) and os.path.exists(file_path):
531      with open(fetch_ts_path) as f:
532        data = f.read().strip()
533        last_binary_fetch_ts = float(data)
534
535      if last_binary_fetch_ts > os.path.getmtime(hash_path):
536        return False
537
538    # Whether the binary stored in local already has hash matched
539    # expected_hash or we need to fetch new binary from cloud, update the
540    # timestamp in |fetch_ts_path| with current time anyway since it is
541    # outdated compared with sha1's last modified time.
542    with open(fetch_ts_path, 'w') as f:
543      f.write(str(time.time()))
544
545    if os.path.exists(file_path) and CalculateHash(file_path) == expected_hash:
546      return False
547    _GetLocked(bucket, expected_hash, file_path)
548    if CalculateHash(file_path) != expected_hash:
549      os.remove(fetch_ts_path)
550      raise RuntimeError(
551          'Binary stored in cloud storage does not have hash matching .sha1 '
552          'file. Please make sure that the binary file is uploaded using '
553          'depot_tools/upload_to_google_storage.py script or through automatic '
554          'framework.')
555    return True
556
557
558def GetFilesInDirectoryIfChanged(directory, bucket):
559  """ Scan the directory for .sha1 files, and download them from the given
560  bucket in cloud storage if the local and remote hash don't match or
561  there is no local copy.
562  """
563  if not os.path.isdir(directory):
564    raise ValueError(
565        '%s does not exist. Must provide a valid directory path.' % directory)
566  # Don't allow the root directory to be a serving_dir.
567  if directory == os.path.abspath(os.sep):
568    raise ValueError('Trying to serve root directory from HTTP server.')
569  for dirpath, _, filenames in os.walk(directory):
570    for filename in filenames:
571      path_name, extension = os.path.splitext(
572          os.path.join(dirpath, filename))
573      if extension != '.sha1':
574        continue
575      GetIfChanged(path_name, bucket)
576
577
578def CalculateHash(file_path):
579  """Calculates and returns the hash of the file at file_path."""
580  sha1 = hashlib.sha1()
581  with open(file_path, 'rb') as f:
582    while True:
583      # Read in 1mb chunks, so it doesn't all have to be loaded into memory.
584      chunk = f.read(1024 * 1024)
585      if not chunk:
586        break
587      sha1.update(chunk)
588  return sha1.hexdigest()
589
590
591def ReadHash(hash_path):
592  with open(hash_path, 'rb') as f:
593    return f.read(1024).rstrip()
594