1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the 'License');
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an 'AS IS' BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import google.auth
18import logging
19import os
20
21from google.cloud import exceptions
22from google.cloud import storage
23
24# OS environment variable name for google application credentials.
25_GOOGLE_CRED_ENV_VAR = 'GOOGLE_APPLICATION_CREDENTIALS'
26# URL to the Google Cloud storage authentication.
27_READ_WRITE_SCOPE_URL = 'https://www.googleapis.com/auth/devstorage.read_write'
28
29
30class GcsApiUtils(object):
31    """GCS (Google Cloud Storage) API utility provider.
32
33    Attributes:
34        _key_path: string, path to the JSON key file of the service account.
35        _bucket_name: string, Google Cloud Storage bucket name.
36        _credentials: credentials object for the service account.
37        _project: string, Google Cloud project name of the service account.
38        _enabled: boolean, whether this GcsApiUtils object is enabled.
39    """
40
41    def __init__(self, key_path, bucket_name):
42        self._key_path = key_path
43        self._bucket_name = bucket_name
44        os.environ[_GOOGLE_CRED_ENV_VAR] = key_path
45        self._enabled = True
46        try:
47            self._credentials, self._project = google.auth.default()
48            if self._credentials.requires_scopes:
49                self._credentials = self._credentials.with_scopes(
50                    [_READ_WRITE_SCOPE_URL])
51        except google.auth.exceptions.DefaultCredentialsError as e:
52            logging.exception(e)
53            self._enabled = False
54
55    @property
56    def Enabled(self):
57        """Gets private variable _enabled.
58
59        Returns:
60            self._enabled: boolean, whether this GcsApiUtils object is enabled.
61        """
62        return self._enabled
63
64    @Enabled.setter
65    def Enabled(self, enabled):
66        """Sets private variable _enabled."""
67        self._enabled = enabled
68
69    def ListFilesWithPrefix(self, dir_path, strict=True):
70        """Returns a list of files under a given GCS prefix.
71
72        GCS uses prefixes to resemble the concept of directories.
73
74        For instance, if we have a directory called 'corpus,'
75        then we have a file named corpus.
76
77        Then we can have files like 'corpus/ILight/ILight_corpus_seed/132,'
78        which may appear that the file named '132' is inside the directory
79        ILight_corpus_seed, whose parent directory is ILight, whose parent
80        directory is corpus.
81
82        However, we only have 1 explicit file that resembles a directory
83        role here: 'corpus.' We do not have directories 'corpus/ILight' or
84        'corpus/ILight/ILight_corpus.'
85
86        Here, we have only 2 files:
87        'corpus/'
88        'corpus/ILight/ILight_corpus_seed/132'
89
90        Given the two prefixes (directories),
91            corpus/ILight/ILight_corpus_seed
92            corpus/ILight/ILight_corpus_seed_01
93
94        ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=True)
95        will only list files in corpus/ILight/ILight_corpus_seed,
96        not in corpus/ILight/ILight_corpus_seed_01.
97
98        ListFilesWithPrefix(corpus/ILight/ILight_corpus_seed, strict=False)
99        will list files in both corpus/ILight/ILight_corpus_seed,
100        and corpus/ILight/ILight_corpus_seed_01.
101
102        Args:
103            dir_path: path to the GCS directory of interest.
104
105        Returns:
106            a list of absolute path filenames of the content of the given GCS directory.
107        """
108        if not self._enabled:
109            logging.error('This GcsApiUtils object is not enabled.')
110            return []
111
112        if strict and not dir_path.endswith('/'):
113            dir_path += '/'
114        client = storage.Client(credentials=self._credentials)
115        bucket = client.get_bucket(self._bucket_name)
116        dir_list = list(bucket.list_blobs(prefix=dir_path))
117        return [file.name for file in dir_list]
118
119    def CountFiles(self, dir_path):
120        """Counts the number of files under a given GCS prefix.
121
122        Args:
123            dir_path: path to the GCS prefix of interest.
124
125        Returns:
126            number of files, if files exist under the prefix.
127            0, if prefix doesnt exist.
128        """
129        if not self._enabled:
130            logging.error('This GcsApiUtils object is not enabled.')
131            return 0
132
133        return len(self.ListFilesWithPrefix(dir_path))
134
135    def PrefixExists(self, dir_path):
136        """Checks if a file containing the prefix exists in the GCS bucket.
137
138        This is effectively "counting" the number of files
139        inside the directory. Depending on whether the prefix/directory
140        file exist or not, this function may return the number of files
141        in the diretory or the number + 1 (the prefix/directory file).
142
143        Returns:
144            True, if such prefix exists in the GCS bucket.
145            False, otherwise.
146        """
147        if not self._enabled:
148            logging.error('This GcsApiUtils object is not enabled.')
149            return False
150
151        return self.CountFiles(dir_path) is not 0
152
153    def FileExists(self, file_path):
154        """Checks if a file exists in the GCS bucket.
155
156        Returns:
157            True, if the specific file exists in the GCS bucket.
158            False, otherwise.
159        """
160        if not self._enabled:
161            logging.error('This GcsApiUtils object is not enabled.')
162            return False
163
164        client = storage.Client(credentials=self._credentials)
165        bucket = client.get_bucket(self._bucket_name)
166        blob = bucket.blob(file_path)
167        return blob.exists()
168
169    def DownloadFile(self, src_file_path, dest_file_path):
170        """Downloads a file to a local destination directory.
171
172        Args:
173            src_file_path: source file path, directory/filename in GCS.
174            dest_file_path: destination file path, directory/filename in local.
175
176        Raises:
177            exception when the source file does not exist in GCS.
178        """
179        if not self._enabled:
180            logging.error('This GcsApiUtils object is not enabled.')
181            return
182
183        client = storage.Client(credentials=self._credentials)
184        bucket = client.get_bucket(self._bucket_name)
185        blob = bucket.blob(src_file_path)
186        blob.download_to_filename(dest_file_path)
187        logging.info('File %s downloaded to %s.', src_file_path,
188                     dest_file_path)
189
190    def PrepareDownloadDestination(self, src_dir, dest_dir):
191        """Makes prerequisite directories in the local destination.
192
193        Args:
194            src_dir: source directory, in GCS.
195            dest_dir: destination directory, in local.
196
197        Returns:
198            local_dest_folder, path to the local folder created (or had already existed).
199        """
200        if not self._enabled:
201            logging.error('This GcsApiUtils object is not enabled.')
202            return
203
204        local_dest_folder = os.path.join(dest_dir, os.path.basename(src_dir))
205        if not os.path.exists(local_dest_folder):
206            os.makedirs(local_dest_folder)
207        return local_dest_folder
208
209    def DownloadDir(self, src_dir, dest_dir):
210        """Downloads a GCS src directory to a local dest dir.
211
212        Args:
213            src_dir: source directory, directory in GCS.
214            dest_dir: destination directory, directory in local.
215
216        Raises:
217            exception when a source file does not exist in GCS.
218
219        Returns:
220            True, if the source directory exists and files successfully downloaded.
221            False, if the source directory does not exist.
222        """
223        if not self._enabled:
224            logging.error('This GcsApiUtils object is not enabled.')
225            return False
226
227        if self.PrefixExists(src_dir):
228            logging.info('successfully found the GCS directory.')
229            self.PrepareDownloadDestination(src_dir, dest_dir)
230            filelist = self.ListFilesWithPrefix(src_dir)
231            for src_file_path in filelist:
232                dest_file_path = os.path.join(
233                    dest_dir,
234                    os.path.join(
235                        os.path.basename(src_dir),
236                        os.path.basename(src_file_path)))
237                try:
238                    self.DownloadFile(src_file_path, dest_file_path)
239                except exceptions.NotFound as e:
240                    logging.error('download failed for file: %s',
241                                  src_file_path)
242            return True
243        else:
244            logging.error('requested GCS directory does not exist.')
245            return False
246
247    def UploadFile(self, src_file_path, dest_file_path):
248        """Uploads a file to a GCS bucket.
249
250        Args:
251            src_file_path: source file path, directory/filename in local.
252            dest_file_path: destination file path, directory/filename in GCS.
253        """
254        if not self._enabled:
255            logging.error('This GcsApiUtils object is not enabled.')
256            return
257
258        client = storage.Client(credentials=self._credentials)
259        bucket = client.get_bucket(self._bucket_name)
260        blob = bucket.blob(dest_file_path)
261        blob.upload_from_filename(src_file_path)
262        logging.info('File %s uploaded to %s.', src_file_path, dest_file_path)
263
264    def UploadDir(self, src_dir, dest_dir):
265        """Uploads a local src dir to a GCS dest dir.
266
267        Args:
268           src_dir: source directory, directory in local.
269           dest_dir: destination directory, directory in GCS.
270
271        Returns:
272            True, if the source directory exists and files successfully uploaded.
273            False, if the source directory does not exist.
274        """
275        if not self._enabled:
276            logging.error('This GcsApiUtils object is not enabled.')
277            return False
278
279        if os.path.exists(src_dir):
280            logging.info('successfully found the local directory.')
281            src_basedir = os.path.basename(src_dir)
282            for dirpath, _, filenames in os.walk(src_dir):
283                for filename in filenames:
284                    src_file_path = os.path.join(dirpath, filename)
285                    dest_file_path = os.path.join(
286                        dest_dir, src_file_path.replace(src_dir, src_basedir))
287                    self.UploadFile(src_file_path, dest_file_path)
288            return True
289        else:
290            logging.error('requested local directory does not exist.')
291            return False
292
293    def MoveFile(self, src_file_path, dest_file_path, log_error=True):
294        """Renames a blob, which effectively changes its path.
295
296        Args:
297            src_file_path: source file path in GCS.
298            dest_dest_path: destination file path in GCS.
299
300        Returns:
301            True if susccessful, False otherwise.
302        """
303        if not self._enabled:
304            logging.error('This GcsApiUtils object is not enabled.')
305            return False
306
307        client = storage.Client(credentials=self._credentials)
308        bucket = client.get_bucket(self._bucket_name)
309        blob = bucket.blob(src_file_path)
310        try:
311            new_blob = bucket.rename_blob(blob, dest_file_path)
312        except exceptions.NotFound as e:
313            if log_error:
314                logging.exception('file move was unsuccessful with error %s.',
315                                  e)
316            return False
317        return True
318
319    def DeleteFile(self, file_path):
320        """Deletes a blob, which effectively deletes its corresponding file.
321
322        Args:
323            file_path: string, path to the file to remove.
324
325        Returns:
326            True if successful, False otherwise.
327        """
328        if not self._enabled:
329            logging.error('This GcsApiUtils object is not enabled.')
330            return False
331
332        client = storage.Client(credentials=self._credentials)
333        bucket = client.get_bucket(self._bucket_name)
334        blob = bucket.blob(file_path)
335        try:
336            blob.delete()
337        except exceptions.NotFound as e:
338            logging.exception('file delete was unsuccessful with error %s.', e)
339            return False
340        return True
341