1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Wrappers for gsutil, for basic interaction with Google Cloud Storage."""
6
7import collections
8import contextlib
9import hashlib
10import logging
11import os
12import shutil
13import stat
14import subprocess
15import re
16import sys
17import tempfile
18
19import py_utils
20from py_utils import lock
21
22# Do a no-op import here so that cloud_storage_global_lock dep is picked up
23# by https://cs.chromium.org/chromium/src/build/android/test_runner.pydeps.
24# TODO(nedn, jbudorick): figure out a way to get rid of this ugly hack.
25from py_utils import cloud_storage_global_lock  # pylint: disable=unused-import
26
27logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
28
29
30PUBLIC_BUCKET = 'chromium-telemetry'
31PARTNER_BUCKET = 'chrome-partner-telemetry'
32INTERNAL_BUCKET = 'chrome-telemetry'
33TELEMETRY_OUTPUT = 'chrome-telemetry-output'
34
35# Uses ordered dict to make sure that bucket's key-value items are ordered from
36# the most open to the most restrictive.
37BUCKET_ALIASES = collections.OrderedDict((
38    ('public', PUBLIC_BUCKET),
39    ('partner', PARTNER_BUCKET),
40    ('internal', INTERNAL_BUCKET),
41    ('output', TELEMETRY_OUTPUT),
42))
43
44BUCKET_ALIAS_NAMES = BUCKET_ALIASES.keys()
45
46
47_GSUTIL_PATH = os.path.join(py_utils.GetCatapultDir(), 'third_party', 'gsutil',
48                            'gsutil')
49
50# TODO(tbarzic): A workaround for http://crbug.com/386416 and
51#     http://crbug.com/359293. See |_RunCommand|.
52_CROS_GSUTIL_HOME_WAR = '/home/chromeos-test/'
53
54
55# If Environment variables has DISABLE_CLOUD_STORAGE_IO set to '1', any method
56# calls that invoke cloud storage network io will throw exceptions.
57DISABLE_CLOUD_STORAGE_IO = 'DISABLE_CLOUD_STORAGE_IO'
58
59# The maximum number of seconds to wait to acquire the pseudo lock for a cloud
60# storage file before raising an exception.
61LOCK_ACQUISITION_TIMEOUT = 10
62
63
64class CloudStorageError(Exception):
65
66  @staticmethod
67  def _GetConfigInstructions():
68    command = _GSUTIL_PATH
69    if py_utils.IsRunningOnCrosDevice():
70      command = 'HOME=%s %s' % (_CROS_GSUTIL_HOME_WAR, _GSUTIL_PATH)
71    return ('To configure your credentials:\n'
72            '  1. Run "%s config" and follow its instructions.\n'
73            '  2. If you have a @google.com account, use that account.\n'
74            '  3. For the project-id, just enter 0.' % command)
75
76
77class PermissionError(CloudStorageError):
78
79  def __init__(self):
80    super(PermissionError, self).__init__(
81        'Attempted to access a file from Cloud Storage but you don\'t '
82        'have permission. ' + self._GetConfigInstructions())
83
84
85class CredentialsError(CloudStorageError):
86
87  def __init__(self):
88    super(CredentialsError, self).__init__(
89        'Attempted to access a file from Cloud Storage but you have no '
90        'configured credentials. ' + self._GetConfigInstructions())
91
92
93class CloudStorageIODisabled(CloudStorageError):
94  pass
95
96
97class NotFoundError(CloudStorageError):
98  pass
99
100
101class ServerError(CloudStorageError):
102  pass
103
104
105# TODO(tonyg/dtu): Can this be replaced with distutils.spawn.find_executable()?
106def _FindExecutableInPath(relative_executable_path, *extra_search_paths):
107  search_paths = list(extra_search_paths) + os.environ['PATH'].split(os.pathsep)
108  for search_path in search_paths:
109    executable_path = os.path.join(search_path, relative_executable_path)
110    if py_utils.IsExecutable(executable_path):
111      return executable_path
112  return None
113
114
115def _EnsureExecutable(gsutil):
116  """chmod +x if gsutil is not executable."""
117  st = os.stat(gsutil)
118  if not st.st_mode & stat.S_IEXEC:
119    os.chmod(gsutil, st.st_mode | stat.S_IEXEC)
120
121
122def _RunCommand(args):
123  # On cros device, as telemetry is running as root, home will be set to /root/,
124  # which is not writable. gsutil will attempt to create a download tracker dir
125  # in home dir and fail. To avoid this, override HOME dir to something writable
126  # when running on cros device.
127  #
128  # TODO(tbarzic): Figure out a better way to handle gsutil on cros.
129  #     http://crbug.com/386416, http://crbug.com/359293.
130  gsutil_env = None
131  if py_utils.IsRunningOnCrosDevice():
132    gsutil_env = os.environ.copy()
133    gsutil_env['HOME'] = _CROS_GSUTIL_HOME_WAR
134
135  if os.name == 'nt':
136    # If Windows, prepend python. Python scripts aren't directly executable.
137    args = [sys.executable, _GSUTIL_PATH] + args
138  else:
139    # Don't do it on POSIX, in case someone is using a shell script to redirect.
140    args = [_GSUTIL_PATH] + args
141    _EnsureExecutable(_GSUTIL_PATH)
142
143  if args[0] not in ('help', 'hash', 'version') and not IsNetworkIOEnabled():
144    raise CloudStorageIODisabled(
145        "Environment variable DISABLE_CLOUD_STORAGE_IO is set to 1. "
146        'Command %s is not allowed to run' % args)
147
148  gsutil = subprocess.Popen(args, stdout=subprocess.PIPE,
149                            stderr=subprocess.PIPE, env=gsutil_env)
150  stdout, stderr = gsutil.communicate()
151
152  if gsutil.returncode:
153    raise GetErrorObjectForCloudStorageStderr(stderr)
154
155  return stdout
156
157
158def GetErrorObjectForCloudStorageStderr(stderr):
159  if (stderr.startswith((
160      'You are attempting to access protected data with no configured',
161      'Failure: No handler was ready to authenticate.')) or
162      re.match('.*401.*does not have .* access to .*', stderr)):
163    return CredentialsError()
164  if ('status=403' in stderr or 'status 403' in stderr or
165      '403 Forbidden' in stderr or
166      re.match('.*403.*does not have .* access to .*', stderr)):
167    return PermissionError()
168  if (stderr.startswith('InvalidUriError') or 'No such object' in stderr or
169      'No URLs matched' in stderr or 'One or more URLs matched no' in stderr):
170    return NotFoundError(stderr)
171  if '500 Internal Server Error' in stderr:
172    return ServerError(stderr)
173  return CloudStorageError(stderr)
174
175
176def IsNetworkIOEnabled():
177  """Returns true if cloud storage is enabled."""
178  disable_cloud_storage_env_val = os.getenv(DISABLE_CLOUD_STORAGE_IO)
179
180  if disable_cloud_storage_env_val and disable_cloud_storage_env_val != '1':
181    logger.error(
182        'Unsupported value of environment variable '
183        'DISABLE_CLOUD_STORAGE_IO. Expected None or \'1\' but got %s.',
184        disable_cloud_storage_env_val)
185
186  return disable_cloud_storage_env_val != '1'
187
188
189def List(bucket):
190  query = 'gs://%s/' % bucket
191  stdout = _RunCommand(['ls', query])
192  return [url[len(query):] for url in stdout.splitlines()]
193
194
195def Exists(bucket, remote_path):
196  try:
197    _RunCommand(['ls', 'gs://%s/%s' % (bucket, remote_path)])
198    return True
199  except NotFoundError:
200    return False
201
202
203def Move(bucket1, bucket2, remote_path):
204  url1 = 'gs://%s/%s' % (bucket1, remote_path)
205  url2 = 'gs://%s/%s' % (bucket2, remote_path)
206  logger.info('Moving %s to %s', url1, url2)
207  _RunCommand(['mv', url1, url2])
208
209
210def Copy(bucket_from, bucket_to, remote_path_from, remote_path_to):
211  """Copy a file from one location in CloudStorage to another.
212
213  Args:
214      bucket_from: The cloud storage bucket where the file is currently located.
215      bucket_to: The cloud storage bucket it is being copied to.
216      remote_path_from: The file path where the file is located in bucket_from.
217      remote_path_to: The file path it is being copied to in bucket_to.
218
219  It should: cause no changes locally or to the starting file, and will
220  overwrite any existing files in the destination location.
221  """
222  url1 = 'gs://%s/%s' % (bucket_from, remote_path_from)
223  url2 = 'gs://%s/%s' % (bucket_to, remote_path_to)
224  logger.info('Copying %s to %s', url1, url2)
225  _RunCommand(['cp', url1, url2])
226
227
228def Delete(bucket, remote_path):
229  url = 'gs://%s/%s' % (bucket, remote_path)
230  logger.info('Deleting %s', url)
231  _RunCommand(['rm', url])
232
233
234def Get(bucket, remote_path, local_path):
235  with _FileLock(local_path):
236    _GetLocked(bucket, remote_path, local_path)
237
238
239_CLOUD_STORAGE_GLOBAL_LOCK = os.path.join(
240    os.path.dirname(os.path.abspath(__file__)), 'cloud_storage_global_lock.py')
241
242
243@contextlib.contextmanager
244def _FileLock(base_path):
245  pseudo_lock_path = '%s.pseudo_lock' % base_path
246  _CreateDirectoryIfNecessary(os.path.dirname(pseudo_lock_path))
247
248  # Make sure that we guard the creation, acquisition, release, and removal of
249  # the pseudo lock all with the same guard (_CLOUD_STORAGE_GLOBAL_LOCK).
250  # Otherwise, we can get nasty interleavings that result in multiple processes
251  # thinking they have an exclusive lock, like:
252  #
253  # (Process 1) Create and acquire the pseudo lock
254  # (Process 1) Release the pseudo lock
255  # (Process 1) Release the file lock
256  # (Process 2) Open and acquire the existing pseudo lock
257  # (Process 1) Delete the (existing) pseudo lock
258  # (Process 3) Create and acquire a new pseudo lock
259  #
260  # Using the same guard for creation and removal of the pseudo lock guarantees
261  # that all processes are referring to the same lock.
262  pseudo_lock_fd = None
263  pseudo_lock_fd_return = []
264  py_utils.WaitFor(lambda: _AttemptPseudoLockAcquisition(pseudo_lock_path,
265                                                         pseudo_lock_fd_return),
266                   LOCK_ACQUISITION_TIMEOUT)
267  pseudo_lock_fd = pseudo_lock_fd_return[0]
268
269  try:
270    yield
271  finally:
272    py_utils.WaitFor(lambda: _AttemptPseudoLockRelease(pseudo_lock_fd),
273                     LOCK_ACQUISITION_TIMEOUT)
274
275def _AttemptPseudoLockAcquisition(pseudo_lock_path, pseudo_lock_fd_return):
276  """Try to acquire the lock and return a boolean indicating whether the attempt
277  was successful. If the attempt was successful, pseudo_lock_fd_return, which
278  should be an empty array, will be modified to contain a single entry: the file
279  descriptor of the (now acquired) lock file.
280
281  This whole operation is guarded with the global cloud storage lock, which
282  prevents race conditions that might otherwise cause multiple processes to
283  believe they hold the same pseudo lock (see _FileLock for more details).
284  """
285  pseudo_lock_fd = None
286  try:
287    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
288      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
289        # Attempt to acquire the lock in a non-blocking manner. If we block,
290        # then we'll cause deadlock because another process will be unable to
291        # acquire the cloud storage global lock in order to release the pseudo
292        # lock.
293        pseudo_lock_fd = open(pseudo_lock_path, 'w')
294        lock.AcquireFileLock(pseudo_lock_fd, lock.LOCK_EX | lock.LOCK_NB)
295        pseudo_lock_fd_return.append(pseudo_lock_fd)
296        return True
297  except (lock.LockException, IOError):
298    # We failed to acquire either the global cloud storage lock or the pseudo
299    # lock.
300    if pseudo_lock_fd:
301      pseudo_lock_fd.close()
302    return False
303
304
305def _AttemptPseudoLockRelease(pseudo_lock_fd):
306  """Try to release the pseudo lock and return a boolean indicating whether
307  the release was succesful.
308
309  This whole operation is guarded with the global cloud storage lock, which
310  prevents race conditions that might otherwise cause multiple processes to
311  believe they hold the same pseudo lock (see _FileLock for more details).
312  """
313  pseudo_lock_path = pseudo_lock_fd.name
314  try:
315    with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file:
316      with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB):
317        lock.ReleaseFileLock(pseudo_lock_fd)
318        pseudo_lock_fd.close()
319        try:
320          os.remove(pseudo_lock_path)
321        except OSError:
322          # We don't care if the pseudo lock gets removed elsewhere before
323          # we have a chance to do so.
324          pass
325        return True
326  except (lock.LockException, IOError):
327    # We failed to acquire the global cloud storage lock and are thus unable to
328    # release the pseudo lock.
329    return False
330
331
332def _CreateDirectoryIfNecessary(directory):
333  if not os.path.exists(directory):
334    os.makedirs(directory)
335
336
337def _GetLocked(bucket, remote_path, local_path):
338  url = 'gs://%s/%s' % (bucket, remote_path)
339  logger.info('Downloading %s to %s', url, local_path)
340  _CreateDirectoryIfNecessary(os.path.dirname(local_path))
341  with tempfile.NamedTemporaryFile(
342      dir=os.path.dirname(local_path),
343      delete=False) as partial_download_path:
344    try:
345      # Windows won't download to an open file.
346      partial_download_path.close()
347      try:
348        _RunCommand(['cp', url, partial_download_path.name])
349      except ServerError:
350        logger.info('Cloud Storage server error, retrying download')
351        _RunCommand(['cp', url, partial_download_path.name])
352      shutil.move(partial_download_path.name, local_path)
353    finally:
354      if os.path.exists(partial_download_path.name):
355        os.remove(partial_download_path.name)
356
357
358def Insert(bucket, remote_path, local_path, publicly_readable=False):
359  """ Upload file in |local_path| to cloud storage.
360  Args:
361    bucket: the google cloud storage bucket name.
362    remote_path: the remote file path in |bucket|.
363    local_path: path of the local file to be uploaded.
364    publicly_readable: whether the uploaded file has publicly readable
365    permission.
366
367  Returns:
368    The url where the file is uploaded to.
369  """
370  url = 'gs://%s/%s' % (bucket, remote_path)
371  command_and_args = ['cp']
372  extra_info = ''
373  if publicly_readable:
374    command_and_args += ['-a', 'public-read']
375    extra_info = ' (publicly readable)'
376  command_and_args += [local_path, url]
377  logger.info('Uploading %s to %s%s', local_path, url, extra_info)
378  _RunCommand(command_and_args)
379  return 'https://console.developers.google.com/m/cloudstorage/b/%s/o/%s' % (
380      bucket, remote_path)
381
382
383def GetIfHashChanged(cs_path, download_path, bucket, file_hash):
384  """Downloads |download_path| to |file_path| if |file_path| doesn't exist or
385     it's hash doesn't match |file_hash|.
386
387  Returns:
388    True if the binary was changed.
389  Raises:
390    CredentialsError if the user has no configured credentials.
391    PermissionError if the user does not have permission to access the bucket.
392    NotFoundError if the file is not in the given bucket in cloud_storage.
393  """
394  with _FileLock(download_path):
395    if (os.path.exists(download_path) and
396        CalculateHash(download_path) == file_hash):
397      return False
398    _GetLocked(bucket, cs_path, download_path)
399    return True
400
401
402def GetIfChanged(file_path, bucket):
403  """Gets the file at file_path if it has a hash file that doesn't match or
404  if there is no local copy of file_path, but there is a hash file for it.
405
406  Returns:
407    True if the binary was changed.
408  Raises:
409    CredentialsError if the user has no configured credentials.
410    PermissionError if the user does not have permission to access the bucket.
411    NotFoundError if the file is not in the given bucket in cloud_storage.
412  """
413  with _FileLock(file_path):
414    hash_path = file_path + '.sha1'
415    if not os.path.exists(hash_path):
416      logger.warning('Hash file not found: %s', hash_path)
417      return False
418
419    expected_hash = ReadHash(hash_path)
420    if os.path.exists(file_path) and CalculateHash(file_path) == expected_hash:
421      return False
422    _GetLocked(bucket, expected_hash, file_path)
423    return True
424
425
426def GetFilesInDirectoryIfChanged(directory, bucket):
427  """ Scan the directory for .sha1 files, and download them from the given
428  bucket in cloud storage if the local and remote hash don't match or
429  there is no local copy.
430  """
431  if not os.path.isdir(directory):
432    raise ValueError(
433        '%s does not exist. Must provide a valid directory path.' % directory)
434  # Don't allow the root directory to be a serving_dir.
435  if directory == os.path.abspath(os.sep):
436    raise ValueError('Trying to serve root directory from HTTP server.')
437  for dirpath, _, filenames in os.walk(directory):
438    for filename in filenames:
439      path_name, extension = os.path.splitext(
440          os.path.join(dirpath, filename))
441      if extension != '.sha1':
442        continue
443      GetIfChanged(path_name, bucket)
444
445
446def CalculateHash(file_path):
447  """Calculates and returns the hash of the file at file_path."""
448  sha1 = hashlib.sha1()
449  with open(file_path, 'rb') as f:
450    while True:
451      # Read in 1mb chunks, so it doesn't all have to be loaded into memory.
452      chunk = f.read(1024 * 1024)
453      if not chunk:
454        break
455      sha1.update(chunk)
456  return sha1.hexdigest()
457
458
459def ReadHash(hash_path):
460  with open(hash_path, 'rb') as f:
461    return f.read(1024).rstrip()
462