1# 2# Copyright (C) 2018 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the 'License'); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an 'AS IS' BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import google.auth 18import logging 19import os 20 21from google.cloud import exceptions 22from google.cloud import storage 23 24# OS environment variable name for google application credentials. 25_GOOGLE_CRED_ENV_VAR = 'GOOGLE_APPLICATION_CREDENTIALS' 26# URL to the Google Cloud storage authentication. 27_READ_WRITE_SCOPE_URL = 'https://www.googleapis.com/auth/devstorage.read_write' 28 29 30class GcsApiUtils(object): 31 """GCS (Google Cloud Storage) API utility provider. 32 33 Attributes: 34 _key_path: string, path to the JSON key file of the service account. 35 _bucket_name: string, Google Cloud Storage bucket name. 36 _credentials: credentials object for the service account. 37 _project: string, Google Cloud project name of the service account. 38 _enabled: boolean, whether this GcsApiUtils object is enabled. 39 """ 40 41 def __init__(self, key_path, bucket_name): 42 self._key_path = key_path 43 self._bucket_name = bucket_name 44 os.environ[_GOOGLE_CRED_ENV_VAR] = key_path 45 self._enabled = True 46 try: 47 self._credentials, self._project = google.auth.default() 48 if self._credentials.requires_scopes: 49 self._credentials = self._credentials.with_scopes( 50 [_READ_WRITE_SCOPE_URL]) 51 except google.auth.exceptions.DefaultCredentialsError as e: 52 logging.exception(e) 53 self._enabled = False 54 55 @property 56 def Enabled(self): 57 """Gets private variable _enabled. 58 59 Returns: 60 self._enabled: boolean, whether this GcsApiUtils object is enabled. 61 """ 62 return self._enabled 63 64 @Enabled.setter 65 def Enabled(self, enabled): 66 """Sets private variable _enabled.""" 67 self._enabled = enabled 68 69 def ListFilesWithPrefix(self, dir_path, strict=True): 70 """Returns a list of files under a given GCS prefix. 71 72 GCS uses prefixes to resemble the concept of directories. 73 74 For instance, if we have a directory called 'corpus,' 75 then we have a file named corpus. 76 77 Then we can have files like 'corpus/ILight/ILight_corpus_seed/132,' 78 which may appear that the file named '132' is inside the directory 79 ILight_corpus_seed, whose parent directory is ILight, whose parent 80 directory is corpus. 81 82 However, we only have 1 explicit file that resembles a directory 83 role here: 'corpus.' We do not have directories 'corpus/ILight' or 84 'corpus/ILight/ILight_corpus.' 85 86 Here, we have only 2 files: 87 'corpus/' 88 'corpus/ILight/ILight_corpus_seed/132' 89 90 Given the two prefixes (directories), 91 corpus/ILight/ILight_corpus_seed 92 corpus/ILight/ILight_corpus_seed_01 93 94 ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=True) 95 will only list files in corpus/ILight/ILight_corpus_seed, 96 not in corpus/ILight/ILight_corpus_seed_01. 97 98 ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=False) 99 will list files in both corpus/ILight/ILight_corpus_seed, 100 and corpus/ILight/ILight_corpus_seed_01. 101 102 Args: 103 dir_path: path to the GCS directory of interest. 104 105 Returns: 106 a list of absolute path filenames of the content of the given GCS directory. 107 """ 108 if not self._enabled: 109 logging.error('This GcsApiUtils object is not enabled.') 110 return [] 111 112 if strict and not dir_path.endswith('/'): 113 dir_path += '/' 114 client = storage.Client(credentials=self._credentials) 115 bucket = client.get_bucket(self._bucket_name) 116 dir_list = list(bucket.list_blobs(prefix=dir_path)) 117 return [file.name for file in dir_list] 118 119 def CountFiles(self, dir_path): 120 """Counts the number of files under a given GCS prefix. 121 122 Args: 123 dir_path: path to the GCS prefix of interest. 124 125 Returns: 126 number of files, if files exist under the prefix. 127 0, if prefix doesnt exist. 128 """ 129 if not self._enabled: 130 logging.error('This GcsApiUtils object is not enabled.') 131 return 0 132 133 return len(self.ListFilesWithPrefix(dir_path)) 134 135 def PrefixExists(self, dir_path): 136 """Checks if a file containing the prefix exists in the GCS bucket. 137 138 This is effectively "counting" the number of files 139 inside the directory. Depending on whether the prefix/directory 140 file exist or not, this function may return the number of files 141 in the diretory or the number + 1 (the prefix/directory file). 142 143 Returns: 144 True, if such prefix exists in the GCS bucket. 145 False, otherwise. 146 """ 147 if not self._enabled: 148 logging.error('This GcsApiUtils object is not enabled.') 149 return False 150 151 return self.CountFiles(dir_path) is not 0 152 153 def FileExists(self, file_path): 154 """Checks if a file exists in the GCS bucket. 155 156 Returns: 157 True, if the specific file exists in the GCS bucket. 158 False, otherwise. 159 """ 160 if not self._enabled: 161 logging.error('This GcsApiUtils object is not enabled.') 162 return False 163 164 client = storage.Client(credentials=self._credentials) 165 bucket = client.get_bucket(self._bucket_name) 166 blob = bucket.blob(file_path) 167 return blob.exists() 168 169 def DownloadFile(self, src_file_path, dest_file_path): 170 """Downloads a file to a local destination directory. 171 172 Args: 173 src_file_path: source file path, directory/filename in GCS. 174 dest_file_path: destination file path, directory/filename in local. 175 176 Raises: 177 exception when the source file does not exist in GCS. 178 """ 179 if not self._enabled: 180 logging.error('This GcsApiUtils object is not enabled.') 181 return 182 183 client = storage.Client(credentials=self._credentials) 184 bucket = client.get_bucket(self._bucket_name) 185 blob = bucket.blob(src_file_path) 186 blob.download_to_filename(dest_file_path) 187 logging.info('File %s downloaded to %s.', src_file_path, 188 dest_file_path) 189 190 def PrepareDownloadDestination(self, src_dir, dest_dir): 191 """Makes prerequisite directories in the local destination. 192 193 Args: 194 src_dir: source directory, in GCS. 195 dest_dir: destination directory, in local. 196 197 Returns: 198 local_dest_folder, path to the local folder created (or had already existed). 199 """ 200 if not self._enabled: 201 logging.error('This GcsApiUtils object is not enabled.') 202 return 203 204 local_dest_folder = os.path.join(dest_dir, os.path.basename(src_dir)) 205 if not os.path.exists(local_dest_folder): 206 os.makedirs(local_dest_folder) 207 return local_dest_folder 208 209 def DownloadDir(self, src_dir, dest_dir): 210 """Downloads a GCS src directory to a local dest dir. 211 212 Args: 213 src_dir: source directory, directory in GCS. 214 dest_dir: destination directory, directory in local. 215 216 Raises: 217 exception when a source file does not exist in GCS. 218 219 Returns: 220 True, if the source directory exists and files successfully downloaded. 221 False, if the source directory does not exist. 222 """ 223 if not self._enabled: 224 logging.error('This GcsApiUtils object is not enabled.') 225 return False 226 227 if self.PrefixExists(src_dir): 228 logging.info('successfully found the GCS directory.') 229 self.PrepareDownloadDestination(src_dir, dest_dir) 230 filelist = self.ListFilesWithPrefix(src_dir) 231 for src_file_path in filelist: 232 dest_file_path = os.path.join( 233 dest_dir, 234 os.path.join( 235 os.path.basename(src_dir), 236 os.path.basename(src_file_path))) 237 try: 238 self.DownloadFile(src_file_path, dest_file_path) 239 except exceptions.NotFound as e: 240 logging.error('download failed for file: %s', 241 src_file_path) 242 return True 243 else: 244 logging.error('requested GCS directory does not exist.') 245 return False 246 247 def UploadFile(self, src_file_path, dest_file_path): 248 """Uploads a file to a GCS bucket. 249 250 Args: 251 src_file_path: source file path, directory/filename in local. 252 dest_file_path: destination file path, directory/filename in GCS. 253 """ 254 if not self._enabled: 255 logging.error('This GcsApiUtils object is not enabled.') 256 return 257 258 client = storage.Client(credentials=self._credentials) 259 bucket = client.get_bucket(self._bucket_name) 260 blob = bucket.blob(dest_file_path) 261 blob.upload_from_filename(src_file_path) 262 logging.info('File %s uploaded to %s.', src_file_path, dest_file_path) 263 264 def UploadDir(self, src_dir, dest_dir): 265 """Uploads a local src dir to a GCS dest dir. 266 267 Args: 268 src_dir: source directory, directory in local. 269 dest_dir: destination directory, directory in GCS. 270 271 Returns: 272 True, if the source directory exists and files successfully uploaded. 273 False, if the source directory does not exist. 274 """ 275 if not self._enabled: 276 logging.error('This GcsApiUtils object is not enabled.') 277 return False 278 279 if os.path.exists(src_dir): 280 logging.info('successfully found the local directory.') 281 src_basedir = os.path.basename(src_dir) 282 for dirpath, _, filenames in os.walk(src_dir): 283 for filename in filenames: 284 src_file_path = os.path.join(dirpath, filename) 285 dest_file_path = os.path.join( 286 dest_dir, src_file_path.replace(src_dir, src_basedir)) 287 self.UploadFile(src_file_path, dest_file_path) 288 return True 289 else: 290 logging.error('requested local directory does not exist.') 291 return False 292 293 def MoveFile(self, src_file_path, dest_file_path, log_error=True): 294 """Renames a blob, which effectively changes its path. 295 296 Args: 297 src_file_path: source file path in GCS. 298 dest_dest_path: destination file path in GCS. 299 300 Returns: 301 True if susccessful, False otherwise. 302 """ 303 if not self._enabled: 304 logging.error('This GcsApiUtils object is not enabled.') 305 return False 306 307 client = storage.Client(credentials=self._credentials) 308 bucket = client.get_bucket(self._bucket_name) 309 blob = bucket.blob(src_file_path) 310 try: 311 new_blob = bucket.rename_blob(blob, dest_file_path) 312 except exceptions.NotFound as e: 313 if log_error: 314 logging.exception('file move was unsuccessful with error %s.', 315 e) 316 return False 317 return True 318 319 def DeleteFile(self, file_path): 320 """Deletes a blob, which effectively deletes its corresponding file. 321 322 Args: 323 file_path: string, path to the file to remove. 324 325 Returns: 326 True if successful, False otherwise. 327 """ 328 if not self._enabled: 329 logging.error('This GcsApiUtils object is not enabled.') 330 return False 331 332 client = storage.Client(credentials=self._credentials) 333 bucket = client.get_bucket(self._bucket_name) 334 blob = bucket.blob(file_path) 335 try: 336 blob.delete() 337 except exceptions.NotFound as e: 338 logging.exception('file delete was unsuccessful with error %s.', e) 339 return False 340 return True 341