1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Wrappers for gsutil, for basic interaction with Google Cloud Storage."""
6
7import collections
8import contextlib
9import hashlib
10import logging
11import os
12import shutil
13import stat
14import subprocess
15import re
16import sys
17import tempfile
18import time
19
20import py_utils
21from py_utils import lock
22
23# Do a no-op import here so that cloud_storage_global_lock dep is picked up
24# by https://cs.chromium.org/chromium/src/build/android/test_runner.pydeps.
25# TODO(nedn, jbudorick): figure out a way to get rid of this ugly hack.
26from py_utils import cloud_storage_global_lock  # pylint: disable=unused-import
27
28logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
29
30
31PUBLIC_BUCKET = 'chromium-telemetry'
32PARTNER_BUCKET = 'chrome-partner-telemetry'
33INTERNAL_BUCKET = 'chrome-telemetry'
34TELEMETRY_OUTPUT = 'chrome-telemetry-output'
35
36# Uses ordered dict to make sure that bucket's key-value items are ordered from
37# the most open to the most restrictive.
38BUCKET_ALIASES = collections.OrderedDict((
39    ('public', PUBLIC_BUCKET),
40    ('partner', PARTNER_BUCKET),
41    ('internal', INTERNAL_BUCKET),
42    ('output', TELEMETRY_OUTPUT),
43))
44
45BUCKET_ALIAS_NAMES = BUCKET_ALIASES.keys()
46
47
48_GSUTIL_PATH = os.path.join(py_utils.GetCatapultDir(), 'third_party', 'gsutil',
49                            'gsutil')
50
51# TODO(tbarzic): A workaround for http://crbug.com/386416 and
52#     http://crbug.com/359293. See |_RunCommand|.
53_CROS_GSUTIL_HOME_WAR = '/home/chromeos-test/'
54
55
56# If Environment variables has DISABLE_CLOUD_STORAGE_IO set to '1', any method
57# calls that invoke cloud storage network io will throw exceptions.
58DISABLE_CLOUD_STORAGE_IO = 'DISABLE_CLOUD_STORAGE_IO'
59
60# The maximum number of seconds to wait to acquire the pseudo lock for a cloud
61# storage file before raising an exception.
62LOCK_ACQUISITION_TIMEOUT = 10
63
64
65class CloudStorageError(Exception):
66
67  @staticmethod
68  def _GetConfigInstructions():
69    command = _GSUTIL_PATH
70    if py_utils.IsRunningOnCrosDevice():
71      command = 'HOME=%s %s' % (_CROS_GSUTIL_HOME_WAR, _GSUTIL_PATH)
72    return ('To configure your credentials:\n'
73            '  1. Run "%s config" and follow its instructions.\n'
74            '  2. If you have a @google.com account, use that account.\n'
75            '  3. For the project-id, just enter 0.' % command)
76
77
78class PermissionError(CloudStorageError):
79
80  def __init__(self):
81    super(PermissionError, self).__init__(
82        'Attempted to access a file from Cloud Storage but you don\'t '
83        'have permission. ' + self._GetConfigInstructions())
84
85
86class CredentialsError(CloudStorageError):
87
88  def __init__(self):
89    super(CredentialsError, self).__init__(
90        'Attempted to access a file from Cloud Storage but you have no '
91        'configured credentials. ' + self._GetConfigInstructions())
92
93
94class CloudStorageIODisabled(CloudStorageError):
95  pass
96
97
98class NotFoundError(CloudStorageError):
99  pass
100
101
102class ServerError(CloudStorageError):
103  pass
104
105
106# TODO(tonyg/dtu): Can this be replaced with distutils.spawn.find_executable()?
107def _FindExecutableInPath(relative_executable_path, *extra_search_paths):
108  search_paths = list(extra_search_paths) + os.environ['PATH'].split(os.pathsep)
109  for search_path in search_paths:
110    executable_path = os.path.join(search_path, relative_executable_path)
111    if py_utils.IsExecutable(executable_path):
112      return executable_path
113  return None
114
115
116def _EnsureExecutable(gsutil):
117  """chmod +x if gsutil is not executable."""
118  st = os.stat(gsutil)
119  if not st.st_mode & stat.S_IEXEC:
120    os.chmod(gsutil, st.st_mode | stat.S_IEXEC)
121
122
123def _RunCommand(args):
124  # On cros device, as telemetry is running as root, home will be set to /root/,
125  # which is not writable. gsutil will attempt to create a download tracker dir
126  # in home dir and fail. To avoid this, override HOME dir to something writable
127  # when running on cros device.
128  #
129  # TODO(tbarzic): Figure out a better way to handle gsutil on cros.
130  #     http://crbug.com/386416, http://crbug.com/359293.
131  gsutil_env = None
132  if py_utils.IsRunningOnCrosDevice():
133    gsutil_env = os.environ.copy()
134    gsutil_env['HOME'] = _CROS_GSUTIL_HOME_WAR
135
136  if os.name == 'nt':
137    # If Windows, prepend python. Python scripts aren't directly executable.
138    args = [sys.executable, _GSUTIL_PATH] + args
139  else:
140    # Don't do it on POSIX, in case someone is using a shell script to redirect.
141    args = [_GSUTIL_PATH] + args
142    _EnsureExecutable(_GSUTIL_PATH)
143
144  if args[0] not in ('help', 'hash', 'version') and not IsNetworkIOEnabled():
145    raise CloudStorageIODisabled(
146        "Environment variable DISABLE_CLOUD_STORAGE_IO is set to 1. "
147        'Command %s is not allowed to run' % args)
148
149  gsutil = subprocess.Popen(args, stdout=subprocess.PIPE,
150                            stderr=subprocess.PIPE, env=gsutil_env)
151  stdout, stderr = gsutil.communicate()
152
153  if gsutil.returncode:
154    raise GetErrorObjectForCloudStorageStderr(stderr)
155
156  return stdout
157
158
159def GetErrorObjectForCloudStorageStderr(stderr):
160  if (stderr.startswith((
161      'You are attempting to access protected data with no configured',
162      'Failure: No handler was ready to authenticate.')) or
163      re.match('.*401.*does not have .* access to .*', stderr)):
164    return CredentialsError()
165  if ('status=403' in stderr or 'status 403' in stderr or
166      '403 Forbidden' in stderr or
167      re.match('.*403.*does not have .* access to .*', stderr)):
168    return PermissionError()
169  if (stderr.startswith('InvalidUriError') or 'No such object' in stderr or
170      'No URLs matched' in stderr or 'One or more URLs matched no' in stderr):
171    return NotFoundError(stderr)
172  if '500 Internal Server Error' in stderr:
173    return ServerError(stderr)
174  return CloudStorageError(stderr)
175
176
177def IsNetworkIOEnabled():
178  """Returns true if cloud storage is enabled."""
179  disable_cloud_storage_env_val = os.getenv(DISABLE_CLOUD_STORAGE_IO)
180
181  if disable_cloud_storage_env_val and disable_cloud_storage_env_val != '1':
182    logger.error(
183        'Unsupported value of environment variable '
184        'DISABLE_CLOUD_STORAGE_IO. Expected None or \'1\' but got %s.',
185        disable_cloud_storage_env_val)
186
187  return disable_cloud_storage_env_val != '1'
188
189
190def List(bucket):
191  query = 'gs://%s/' % bucket
192  stdout = _RunCommand(['ls', query])
193  return [url[len(query):] for url in stdout.splitlines()]
194
195
196def Exists(bucket, remote_path):
197  try:
198    _RunCommand(['ls', 'gs://%s/%s' % (bucket, remote_path)])
199    return True
200  except NotFoundError:
201    return False
202
203
204def Move(bucket1, bucket2, remote_path):
205  url1 = 'gs://%s/%s' % (bucket1, remote_path)
206  url2 = 'gs://%s/%s' % (bucket2, remote_path)
207  logger.info('Moving %s to %s', url1, url2)
208  _RunCommand(['mv', url1, url2])
209
210
211def Copy(bucket_from, bucket_to, remote_path_from, remote_path_to):
212  """Copy a file from one location in CloudStorage to another.
213
214  Args:
215      bucket_from: The cloud storage bucket where the file is currently located.
216      bucket_to: The cloud storage bucket it is being copied to.
217      remote_path_from: The file path where the file is located in bucket_from.
218      remote_path_to: The file path it is being copied to in bucket_to.
219
220  It should: cause no changes locally or to the starting file, and will
221  overwrite any existing files in the destination location.
222  """
223  url1 = 'gs://%s/%s' % (bucket_from, remote_path_from)
224  url2 = 'gs://%s/%s' % (bucket_to, remote_path_to)
225  logger.info('Copying %s to %s', url1, url2)
226  _RunCommand(['cp', url1, url2])
227
228
229def Delete(bucket, remote_path):
230  url = 'gs://%s/%s' % (bucket, remote_path)
231  logger.info('Deleting %s', url)
232  _RunCommand(['rm', url])
233
234
235def Get(bucket, remote_path, local_path):
236  with _FileLock(local_path):
237    _GetLocked(bucket, remote_path, local_path)
238
239
240_CLOUD_STORAGE_GLOBAL_LOCK = os.path.join(
241    os.path.dirname(os.path.abspath(__file__)), 'cloud_storage_global_lock.py')
242
243
244@contextlib.contextmanager
245def _FileLock(base_path):
246  pseudo_lock_path = '%s.pseudo_lock' % base_path
247  _CreateDirectoryIfNecessary(os.path.dirname(pseudo_lock_path))
248
249  # Make sure that we guard the creation, acquisition, release, and removal of
250  # the pseudo lock all with the same guard (_CLOUD_STORAGE_GLOBAL_LOCK).
251  # Otherwise, we can get nasty interleavings that result in multiple processes
252  # thinking they have an exclusive lock, like:
253  #
254  # (Process 1) Create and acquire the pseudo lock
255  # (Process 1) Release the pseudo lock
256  # (Process 1) Release the file lock
257  # (Process 2) Open and acquire the existing pseudo lock
258  # (Process 1) Delete the (existing) pseudo lock
259  # (Process 3) Create and acquire a new pseudo lock
260  #
261  # Using the same guard for creation and removal of the pseudo lock guarantees
262  # that all processes are referring to the same lock.
263  pseudo_lock_fd = None
264  pseudo_lock_fd_return = []
265  py_utils.WaitFor(lambda: _AttemptPseudoLockAcquisition(pseudo_lock_path,
266                                                         pseudo_lock_fd_return),
267                   LOCK_ACQUISITION_TIMEOUT)
268  pseudo_lock_fd = pseudo_lock_fd_return[0]
269
270  try:
271    yield
272  finally:
273    py_utils.WaitFor(lambda: _AttemptPseudoLockRelease(pseudo_lock_fd),
274                     LOCK_ACQUISITION_TIMEOUT)
275
276def _AttemptPseudoLockAcquisition(pseudo_lock_path, pseudo_lock_fd_return):
277  """Try to acquire the lock and return a boolean indicating whether the attempt
278  was successful. If the attempt was successful, pseudo_lock_fd_return, which
279  should be an empty array, will be modified to contain a single entry: the file
280  descriptor of the (now acquired) lock file.
281
282  This whole operation is guarded with the global cloud storage lock, which
283  prevents race conditions that might otherwise cause multiple processes to
284  believe they hold the same pseudo lock (see _FileLock for more details).
285  """
286  pseudo_lock_fd = None
287  try:
288    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
289      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
290        # Attempt to acquire the lock in a non-blocking manner. If we block,
291        # then we'll cause deadlock because another process will be unable to
292        # acquire the cloud storage global lock in order to release the pseudo
293        # lock.
294        pseudo_lock_fd = open(pseudo_lock_path, 'w')
295        lock.AcquireFileLock(pseudo_lock_fd, lock.LOCK_EX | lock.LOCK_NB)
296        pseudo_lock_fd_return.append(pseudo_lock_fd)
297        return True
298  except (lock.LockException, IOError):
299    # We failed to acquire either the global cloud storage lock or the pseudo
300    # lock.
301    if pseudo_lock_fd:
302      pseudo_lock_fd.close()
303    return False
304
305
306def _AttemptPseudoLockRelease(pseudo_lock_fd):
307  """Try to release the pseudo lock and return a boolean indicating whether
308  the release was succesful.
309
310  This whole operation is guarded with the global cloud storage lock, which
311  prevents race conditions that might otherwise cause multiple processes to
312  believe they hold the same pseudo lock (see _FileLock for more details).
313  """
314  pseudo_lock_path = pseudo_lock_fd.name
315  try:
316    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
317      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
318        lock.ReleaseFileLock(pseudo_lock_fd)
319        pseudo_lock_fd.close()
320        try:
321          os.remove(pseudo_lock_path)
322        except OSError:
323          # We don't care if the pseudo lock gets removed elsewhere before
324          # we have a chance to do so.
325          pass
326        return True
327  except (lock.LockException, IOError):
328    # We failed to acquire the global cloud storage lock and are thus unable to
329    # release the pseudo lock.
330    return False
331
332
333def _CreateDirectoryIfNecessary(directory):
334  if not os.path.exists(directory):
335    os.makedirs(directory)
336
337
338def _GetLocked(bucket, remote_path, local_path):
339  url = 'gs://%s/%s' % (bucket, remote_path)
340  logger.info('Downloading %s to %s', url, local_path)
341  _CreateDirectoryIfNecessary(os.path.dirname(local_path))
342  with tempfile.NamedTemporaryFile(
343      dir=os.path.dirname(local_path),
344      delete=False) as partial_download_path:
345    try:
346      # Windows won't download to an open file.
347      partial_download_path.close()
348      try:
349        _RunCommand(['cp', url, partial_download_path.name])
350      except ServerError:
351        logger.info('Cloud Storage server error, retrying download')
352        _RunCommand(['cp', url, partial_download_path.name])
353      shutil.move(partial_download_path.name, local_path)
354    finally:
355      if os.path.exists(partial_download_path.name):
356        os.remove(partial_download_path.name)
357
358
359def Insert(bucket, remote_path, local_path, publicly_readable=False):
360  """ Upload file in |local_path| to cloud storage.
361  Args:
362    bucket: the google cloud storage bucket name.
363    remote_path: the remote file path in |bucket|.
364    local_path: path of the local file to be uploaded.
365    publicly_readable: whether the uploaded file has publicly readable
366    permission.
367
368  Returns:
369    The url where the file is uploaded to.
370  """
371  url = 'gs://%s/%s' % (bucket, remote_path)
372  command_and_args = ['cp']
373  extra_info = ''
374  if publicly_readable:
375    command_and_args += ['-a', 'public-read']
376    extra_info = ' (publicly readable)'
377  command_and_args += [local_path, url]
378  logger.info('Uploading %s to %s%s', local_path, url, extra_info)
379  _RunCommand(command_and_args)
380  return 'https://console.developers.google.com/m/cloudstorage/b/%s/o/%s' % (
381      bucket, remote_path)
382
383
384def GetIfHashChanged(cs_path, download_path, bucket, file_hash):
385  """Downloads |download_path| to |file_path| if |file_path| doesn't exist or
386     it's hash doesn't match |file_hash|.
387
388  Returns:
389    True if the binary was changed.
390  Raises:
391    CredentialsError if the user has no configured credentials.
392    PermissionError if the user does not have permission to access the bucket.
393    NotFoundError if the file is not in the given bucket in cloud_storage.
394  """
395  with _FileLock(download_path):
396    if (os.path.exists(download_path) and
397        CalculateHash(download_path) == file_hash):
398      return False
399    _GetLocked(bucket, cs_path, download_path)
400    return True
401
402
403def GetIfChanged(file_path, bucket):
404  """Gets the file at file_path if it has a hash file that doesn't match or
405  if there is no local copy of file_path, but there is a hash file for it.
406
407  Returns:
408    True if the binary was changed.
409  Raises:
410    CredentialsError if the user has no configured credentials.
411    PermissionError if the user does not have permission to access the bucket.
412    NotFoundError if the file is not in the given bucket in cloud_storage.
413  """
414  with _FileLock(file_path):
415    hash_path = file_path + '.sha1'
416    fetch_ts_path = file_path + '.fetchts'
417    if not os.path.exists(hash_path):
418      logger.warning('Hash file not found: %s', hash_path)
419      return False
420
421    expected_hash = ReadHash(hash_path)
422
423    # To save the time required computing binary hash (which is an expensive
424    # operation, see crbug.com/793609#c2 for details), any time we fetch a new
425    # binary, we save not only that binary but the time of the fetch in
426    # |fetch_ts_path|. Anytime the file needs updated (its
427    # hash in |hash_path| change), we can just need to compare the timestamp of
428    # |hash_path| with the timestamp in |fetch_ts_path| to figure out
429    # if the update operation has been done.
430    #
431    # Notes: for this to work, we make the assumption that only
432    # cloud_storage.GetIfChanged modifies the local |file_path| binary.
433
434    if os.path.exists(fetch_ts_path) and os.path.exists(file_path):
435      with open(fetch_ts_path) as f:
436        data = f.read().strip()
437        last_binary_fetch_ts = float(data)
438
439      if last_binary_fetch_ts > os.path.getmtime(hash_path):
440        return False
441
442    # Whether the binary stored in local already has hash matched
443    # expected_hash or we need to fetch new binary from cloud, update the
444    # timestamp in |fetch_ts_path| with current time anyway since it is
445    # outdated compared with sha1's last modified time.
446    with open(fetch_ts_path, 'w') as f:
447      f.write(str(time.time()))
448
449    if os.path.exists(file_path) and CalculateHash(file_path) == expected_hash:
450      return False
451    _GetLocked(bucket, expected_hash, file_path)
452    if CalculateHash(file_path) != expected_hash:
453      os.remove(fetch_ts_path)
454      raise RuntimeError(
455          'Binary stored in cloud storage does not have hash matching .sha1 '
456          'file. Please make sure that the binary file is uploaded using '
457          'depot_tools/upload_to_google_storage.py script or through automatic '
458          'framework.')
459    return True
460
461
462def GetFilesInDirectoryIfChanged(directory, bucket):
463  """ Scan the directory for .sha1 files, and download them from the given
464  bucket in cloud storage if the local and remote hash don't match or
465  there is no local copy.
466  """
467  if not os.path.isdir(directory):
468    raise ValueError(
469        '%s does not exist. Must provide a valid directory path.' % directory)
470  # Don't allow the root directory to be a serving_dir.
471  if directory == os.path.abspath(os.sep):
472    raise ValueError('Trying to serve root directory from HTTP server.')
473  for dirpath, _, filenames in os.walk(directory):
474    for filename in filenames:
475      path_name, extension = os.path.splitext(
476          os.path.join(dirpath, filename))
477      if extension != '.sha1':
478        continue
479      GetIfChanged(path_name, bucket)
480
481
482def CalculateHash(file_path):
483  """Calculates and returns the hash of the file at file_path."""
484  sha1 = hashlib.sha1()
485  with open(file_path, 'rb') as f:
486    while True:
487      # Read in 1mb chunks, so it doesn't all have to be loaded into memory.
488      chunk = f.read(1024 * 1024)
489      if not chunk:
490        break
491      sha1.update(chunk)
492  return sha1.hexdigest()
493
494
495def ReadHash(hash_path):
496  with open(hash_path, 'rb') as f:
497    return f.read(1024).rstrip()
498