# # Copyright (C) 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import google.auth import logging import os from google.cloud import exceptions from google.cloud import storage # OS environment variable name for google application credentials. _GOOGLE_CRED_ENV_VAR = 'GOOGLE_APPLICATION_CREDENTIALS' # URL to the Google Cloud storage authentication. _READ_WRITE_SCOPE_URL = 'https://www.googleapis.com/auth/devstorage.read_write' class GcsApiUtils(object): """GCS (Google Cloud Storage) API utility provider. Attributes: _key_path: string, path to the JSON key file of the service account. _bucket_name: string, Google Cloud Storage bucket name. _credentials: credentials object for the service account. _project: string, Google Cloud project name of the service account. _enabled: boolean, whether this GcsApiUtils object is enabled. """ def __init__(self, key_path, bucket_name): self._key_path = key_path self._bucket_name = bucket_name os.environ[_GOOGLE_CRED_ENV_VAR] = key_path self._enabled = True try: self._credentials, self._project = google.auth.default() if self._credentials.requires_scopes: self._credentials = self._credentials.with_scopes( [_READ_WRITE_SCOPE_URL]) except google.auth.exceptions.DefaultCredentialsError as e: logging.exception(e) self._enabled = False @property def Enabled(self): """Gets private variable _enabled. Returns: self._enabled: boolean, whether this GcsApiUtils object is enabled. """ return self._enabled @Enabled.setter def Enabled(self, enabled): """Sets private variable _enabled.""" self._enabled = enabled def ListFilesWithPrefix(self, dir_path, strict=True): """Returns a list of files under a given GCS prefix. GCS uses prefixes to resemble the concept of directories. For instance, if we have a directory called 'corpus,' then we have a file named corpus. Then we can have files like 'corpus/ILight/ILight_corpus_seed/132,' which may appear that the file named '132' is inside the directory ILight_corpus_seed, whose parent directory is ILight, whose parent directory is corpus. However, we only have 1 explicit file that resembles a directory role here: 'corpus.' We do not have directories 'corpus/ILight' or 'corpus/ILight/ILight_corpus.' Here, we have only 2 files: 'corpus/' 'corpus/ILight/ILight_corpus_seed/132' Given the two prefixes (directories), corpus/ILight/ILight_corpus_seed corpus/ILight/ILight_corpus_seed_01 ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=True) will only list files in corpus/ILight/ILight_corpus_seed, not in corpus/ILight/ILight_corpus_seed_01. ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=False) will list files in both corpus/ILight/ILight_corpus_seed, and corpus/ILight/ILight_corpus_seed_01. Args: dir_path: path to the GCS directory of interest. Returns: a list of absolute path filenames of the content of the given GCS directory. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return [] if strict and not dir_path.endswith('/'): dir_path += '/' client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) dir_list = list(bucket.list_blobs(prefix=dir_path)) return [file.name for file in dir_list] def CountFiles(self, dir_path): """Counts the number of files under a given GCS prefix. Args: dir_path: path to the GCS prefix of interest. Returns: number of files, if files exist under the prefix. 0, if prefix doesnt exist. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return 0 return len(self.ListFilesWithPrefix(dir_path)) def PrefixExists(self, dir_path): """Checks if a file containing the prefix exists in the GCS bucket. This is effectively "counting" the number of files inside the directory. Depending on whether the prefix/directory file exist or not, this function may return the number of files in the diretory or the number + 1 (the prefix/directory file). Returns: True, if such prefix exists in the GCS bucket. False, otherwise. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False return self.CountFiles(dir_path) is not 0 def FileExists(self, file_path): """Checks if a file exists in the GCS bucket. Returns: True, if the specific file exists in the GCS bucket. False, otherwise. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) blob = bucket.blob(file_path) return blob.exists() def DownloadFile(self, src_file_path, dest_file_path): """Downloads a file to a local destination directory. Args: src_file_path: source file path, directory/filename in GCS. dest_file_path: destination file path, directory/filename in local. Raises: exception when the source file does not exist in GCS. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) blob = bucket.blob(src_file_path) blob.download_to_filename(dest_file_path) logging.info('File %s downloaded to %s.', src_file_path, dest_file_path) def PrepareDownloadDestination(self, src_dir, dest_dir): """Makes prerequisite directories in the local destination. Args: src_dir: source directory, in GCS. dest_dir: destination directory, in local. Returns: local_dest_folder, path to the local folder created (or had already existed). """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return local_dest_folder = os.path.join(dest_dir, os.path.basename(src_dir)) if not os.path.exists(local_dest_folder): os.makedirs(local_dest_folder) return local_dest_folder def DownloadDir(self, src_dir, dest_dir): """Downloads a GCS src directory to a local dest dir. Args: src_dir: source directory, directory in GCS. dest_dir: destination directory, directory in local. Raises: exception when a source file does not exist in GCS. Returns: True, if the source directory exists and files successfully downloaded. False, if the source directory does not exist. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False if self.PrefixExists(src_dir): logging.info('successfully found the GCS directory.') self.PrepareDownloadDestination(src_dir, dest_dir) filelist = self.ListFilesWithPrefix(src_dir) for src_file_path in filelist: dest_file_path = os.path.join( dest_dir, os.path.join( os.path.basename(src_dir), os.path.basename(src_file_path))) try: self.DownloadFile(src_file_path, dest_file_path) except exceptions.NotFound as e: logging.error('download failed for file: %s', src_file_path) return True else: logging.error('requested GCS directory does not exist.') return False def UploadFile(self, src_file_path, dest_file_path): """Uploads a file to a GCS bucket. Args: src_file_path: source file path, directory/filename in local. dest_file_path: destination file path, directory/filename in GCS. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) blob = bucket.blob(dest_file_path) blob.upload_from_filename(src_file_path) logging.info('File %s uploaded to %s.', src_file_path, dest_file_path) def UploadDir(self, src_dir, dest_dir): """Uploads a local src dir to a GCS dest dir. Args: src_dir: source directory, directory in local. dest_dir: destination directory, directory in GCS. Returns: True, if the source directory exists and files successfully uploaded. False, if the source directory does not exist. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False if os.path.exists(src_dir): logging.info('successfully found the local directory.') src_basedir = os.path.basename(src_dir) for dirpath, _, filenames in os.walk(src_dir): for filename in filenames: src_file_path = os.path.join(dirpath, filename) dest_file_path = os.path.join( dest_dir, src_file_path.replace(src_dir, src_basedir)) self.UploadFile(src_file_path, dest_file_path) return True else: logging.error('requested local directory does not exist.') return False def MoveFile(self, src_file_path, dest_file_path, log_error=True): """Renames a blob, which effectively changes its path. Args: src_file_path: source file path in GCS. dest_dest_path: destination file path in GCS. Returns: True if susccessful, False otherwise. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) blob = bucket.blob(src_file_path) try: new_blob = bucket.rename_blob(blob, dest_file_path) except exceptions.NotFound as e: if log_error: logging.exception('file move was unsuccessful with error %s.', e) return False return True def DeleteFile(self, file_path): """Deletes a blob, which effectively deletes its corresponding file. Args: file_path: string, path to the file to remove. Returns: True if successful, False otherwise. """ if not self._enabled: logging.error('This GcsApiUtils object is not enabled.') return False client = storage.Client(credentials=self._credentials) bucket = client.get_bucket(self._bucket_name) blob = bucket.blob(file_path) try: blob.delete() except exceptions.NotFound as e: logging.exception('file delete was unsuccessful with error %s.', e) return False return True