1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
18 
19 #include <string>
20 #include <utility>
21 #include <vector>
22 
23 #include "tensorflow/core/lib/core/status.h"
24 #include "tensorflow/core/platform/cloud/auth_provider.h"
25 #include "tensorflow/core/platform/cloud/compute_engine_metadata_client.h"
26 #include "tensorflow/core/platform/cloud/compute_engine_zone_provider.h"
27 #include "tensorflow/core/platform/cloud/expiring_lru_cache.h"
28 #include "tensorflow/core/platform/cloud/file_block_cache.h"
29 #include "tensorflow/core/platform/cloud/gcs_dns_cache.h"
30 #include "tensorflow/core/platform/cloud/gcs_throttle.h"
31 #include "tensorflow/core/platform/cloud/http_request.h"
32 #include "tensorflow/core/platform/cloud/retrying_file_system.h"
33 #include "tensorflow/core/platform/file_system.h"
34 
35 namespace tensorflow {
36 
37 class GcsFileSystem;
38 
39 /// GcsStatsInterface allows for instrumentation of the GCS file system.
40 ///
41 /// GcsStatsInterface and its subclasses must be safe to use from multiple
42 /// threads concurrently.
43 ///
44 /// WARNING! This is an experimental interface that may change or go away at any
45 /// time.
46 class GcsStatsInterface {
47  public:
48   /// Configure is called by the GcsFileSystem to provide instrumentation hooks.
49   ///
50   /// Note: Configure can be called multiple times (e.g. if the block cache is
51   /// re-initialized).
52   virtual void Configure(GcsFileSystem* fs, GcsThrottle* throttle,
53                          const FileBlockCache* block_cache) = 0;
54 
55   /// RecordBlockLoadRequest is called to record a block load request is about
56   /// to be made.
57   virtual void RecordBlockLoadRequest(const string& file, size_t offset) = 0;
58 
59   /// RecordBlockRetrieved is called once a block within the file has been
60   /// retrieved.
61   virtual void RecordBlockRetrieved(const string& file, size_t offset,
62                                     size_t bytes_transferred) = 0;
63 
64   // RecordStatObjectRequest is called once a statting object request over GCS
65   // is about to be made.
66   virtual void RecordStatObjectRequest() = 0;
67 
68   /// HttpStats is called to optionally provide a RequestStats listener
69   /// to be annotated on every HTTP request made to the GCS API.
70   ///
71   /// HttpStats() may return nullptr.
72   virtual HttpRequest::RequestStats* HttpStats() = 0;
73 
74   virtual ~GcsStatsInterface() = default;
75 };
76 
77 /// Google Cloud Storage implementation of a file system.
78 ///
79 /// The clients should use RetryingGcsFileSystem defined below,
80 /// which adds retry logic to GCS operations.
81 class GcsFileSystem : public FileSystem {
82  public:
83   struct TimeoutConfig;
84 
85   // Main constructor used (via RetryingFileSystem) throughout Tensorflow
86   GcsFileSystem();
87   // Used mostly for unit testing or use cases which need to customize the
88   // filesystem from defaults
89   GcsFileSystem(std::unique_ptr<AuthProvider> auth_provider,
90                 std::unique_ptr<HttpRequest::Factory> http_request_factory,
91                 std::unique_ptr<ZoneProvider> zone_provider, size_t block_size,
92                 size_t max_bytes, uint64 max_staleness,
93                 uint64 stat_cache_max_age, size_t stat_cache_max_entries,
94                 uint64 matching_paths_cache_max_age,
95                 size_t matching_paths_cache_max_entries,
96                 RetryConfig retry_config, TimeoutConfig timeouts,
97                 const std::unordered_set<string>& allowed_locations,
98                 std::pair<const string, const string>* additional_header);
99 
100   Status NewRandomAccessFile(
101       const string& filename,
102       std::unique_ptr<RandomAccessFile>* result) override;
103 
104   Status NewWritableFile(const string& fname,
105                          std::unique_ptr<WritableFile>* result) override;
106 
107   Status NewAppendableFile(const string& fname,
108                            std::unique_ptr<WritableFile>* result) override;
109 
110   Status NewReadOnlyMemoryRegionFromFile(
111       const string& filename,
112       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
113 
114   Status FileExists(const string& fname) override;
115 
116   Status Stat(const string& fname, FileStatistics* stat) override;
117 
118   Status GetChildren(const string& dir, std::vector<string>* result) override;
119 
120   Status GetMatchingPaths(const string& pattern,
121                           std::vector<string>* results) override;
122 
123   Status DeleteFile(const string& fname) override;
124 
125   Status CreateDir(const string& dirname) override;
126 
127   Status DeleteDir(const string& dirname) override;
128 
129   Status GetFileSize(const string& fname, uint64* file_size) override;
130 
131   Status RenameFile(const string& src, const string& target) override;
132 
133   Status IsDirectory(const string& fname) override;
134 
135   Status DeleteRecursively(const string& dirname, int64* undeleted_files,
136                            int64* undeleted_dirs) override;
137 
138   void FlushCaches() override;
139 
140   /// Set an object to collect runtime statistics from the GcsFilesystem.
141   void SetStats(GcsStatsInterface* stats);
142 
143   /// These accessors are mainly for testing purposes, to verify that the
144   /// environment variables that control these parameters are handled correctly.
block_size()145   size_t block_size() {
146     tf_shared_lock l(block_cache_lock_);
147     return file_block_cache_->block_size();
148   }
max_bytes()149   size_t max_bytes() {
150     tf_shared_lock l(block_cache_lock_);
151     return file_block_cache_->max_bytes();
152   }
max_staleness()153   uint64 max_staleness() {
154     tf_shared_lock l(block_cache_lock_);
155     return file_block_cache_->max_staleness();
156   }
timeouts()157   TimeoutConfig timeouts() const { return timeouts_; }
allowed_locations()158   std::unordered_set<string> allowed_locations() const {
159     return allowed_locations_;
160   }
additional_header_name()161   string additional_header_name() const {
162     return additional_header_ ? additional_header_->first : "";
163   }
additional_header_value()164   string additional_header_value() const {
165     return additional_header_ ? additional_header_->second : "";
166   }
167 
stat_cache_max_age()168   uint64 stat_cache_max_age() const { return stat_cache_->max_age(); }
stat_cache_max_entries()169   size_t stat_cache_max_entries() const { return stat_cache_->max_entries(); }
170 
matching_paths_cache_max_age()171   uint64 matching_paths_cache_max_age() const {
172     return matching_paths_cache_->max_age();
173   }
matching_paths_cache_max_entries()174   size_t matching_paths_cache_max_entries() const {
175     return matching_paths_cache_->max_entries();
176   }
177 
178   /// Structure containing the information for timeouts related to accessing the
179   /// GCS APIs.
180   ///
181   /// All values are in seconds.
182   struct TimeoutConfig {
183     // The request connection timeout. If a connection cannot be established
184     // within `connect` seconds, abort the request.
185     uint32 connect = 120;  // 2 minutes
186 
187     // The request idle timeout. If a request has seen no activity in `idle`
188     // seconds, abort the request.
189     uint32 idle = 60;  // 1 minute
190 
191     // The maximum total time a metadata request can take. If a request has not
192     // completed within `metadata` seconds, the request is aborted.
193     uint32 metadata = 3600;  // 1 hour
194 
195     // The maximum total time a block read request can take. If a request has
196     // not completed within `read` seconds, the request is aborted.
197     uint32 read = 3600;  // 1 hour
198 
199     // The maximum total time an upload request can take. If a request has not
200     // completed within `write` seconds, the request is aborted.
201     uint32 write = 3600;  // 1 hour
202 
TimeoutConfigTimeoutConfig203     TimeoutConfig() {}
TimeoutConfigTimeoutConfig204     TimeoutConfig(uint32 connect, uint32 idle, uint32 metadata, uint32 read,
205                   uint32 write)
206         : connect(connect),
207           idle(idle),
208           metadata(metadata),
209           read(read),
210           write(write) {}
211   };
212 
213   Status CreateHttpRequest(std::unique_ptr<HttpRequest>* request);
214 
215   /// \brief Sets a new AuthProvider on the GCS FileSystem.
216   ///
217   /// The new auth provider will be used for all subsequent requests.
218   void SetAuthProvider(std::unique_ptr<AuthProvider> auth_provider);
219 
220   /// \brief Resets the block cache and re-instantiates it with the new values.
221   ///
222   /// This method can be used to clear the existing block cache and/or to
223   /// re-configure the block cache for different values.
224   ///
225   /// Note: the existing block cache is not cleaned up until all existing files
226   /// have been closed.
227   void ResetFileBlockCache(size_t block_size_bytes, size_t max_bytes,
228                            uint64 max_staleness_secs);
229 
230  private:
231   // GCS file statistics.
232   struct GcsFileStat {
233     FileStatistics base;
234     int64 generation_number = 0;
235   };
236 
237   /// \brief Checks if the bucket exists. Returns OK if the check succeeded.
238   ///
239   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
240   Status BucketExists(const string& bucket, bool* result);
241 
242   /// \brief Retrieves the GCS bucket location. Returns OK if the location was
243   /// retrieved.
244   ///
245   /// Given a string bucket the GCS bucket metadata API will be called and the
246   /// location string filled with the location of the bucket.
247   ///
248   /// This requires the bucket metadata permission.
249   /// Repeated calls for the same bucket are cached so this function can be
250   /// called frequently without causing an extra API call
251   Status GetBucketLocation(const string& bucket, string* location);
252 
253   /// \brief Check if the GCS buckets location is allowed with the current
254   /// constraint configuration
255   Status CheckBucketLocationConstraint(const string& bucket);
256 
257   /// \brief Given the input bucket `bucket`, fills `result_buffer` with the
258   /// results of the metadata. Returns OK if the API call succeeds without
259   /// error.
260   Status GetBucketMetadata(const string& bucket,
261                            std::vector<char>* result_buffer);
262 
263   /// \brief Checks if the object exists. Returns OK if the check succeeded.
264   ///
265   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
266   Status ObjectExists(const string& fname, const string& bucket,
267                       const string& object, bool* result);
268 
269   /// \brief Checks if the folder exists. Returns OK if the check succeeded.
270   ///
271   /// 'result' is set if the function returns OK. 'result' cannot be nullptr.
272   Status FolderExists(const string& dirname, bool* result);
273 
274   /// \brief Internal version of GetChildren with more knobs.
275   ///
276   /// If 'recursively' is true, returns all objects in all subfolders.
277   /// Otherwise only returns the immediate children in the directory.
278   ///
279   /// If 'include_self_directory_marker' is true and there is a GCS directory
280   /// marker at the path 'dir', GetChildrenBound will return an empty string
281   /// as one of the children that represents this marker.
282   Status GetChildrenBounded(const string& dir, uint64 max_results,
283                             std::vector<string>* result, bool recursively,
284                             bool include_self_directory_marker);
285 
286   /// Retrieves file statistics assuming fname points to a GCS object. The data
287   /// may be read from cache or from GCS directly.
288   Status StatForObject(const string& fname, const string& bucket,
289                        const string& object, GcsFileStat* stat);
290   /// Retrieves file statistics of file fname directly from GCS.
291   Status UncachedStatForObject(const string& fname, const string& bucket,
292                                const string& object, GcsFileStat* stat);
293 
294   Status RenameObject(const string& src, const string& target);
295 
296   std::unique_ptr<FileBlockCache> MakeFileBlockCache(size_t block_size,
297                                                      size_t max_bytes,
298                                                      uint64 max_staleness);
299 
300   /// Loads file contents from GCS for a given filename, offset, and length.
301   Status LoadBufferFromGCS(const string& filename, size_t offset, size_t n,
302                            char* buffer, size_t* bytes_transferred);
303 
304   // Clear all the caches related to the file with name `filename`.
305   void ClearFileCaches(const string& fname);
306 
307   mutex mu_;
308   std::unique_ptr<AuthProvider> auth_provider_ GUARDED_BY(mu_);
309   std::shared_ptr<HttpRequest::Factory> http_request_factory_;
310   std::unique_ptr<ZoneProvider> zone_provider_;
311   // block_cache_lock_ protects the file_block_cache_ pointer (Note that
312   // FileBlockCache instances are themselves threadsafe).
313   mutex block_cache_lock_;
314   std::unique_ptr<FileBlockCache> file_block_cache_
315       GUARDED_BY(block_cache_lock_);
316   std::shared_ptr<ComputeEngineMetadataClient> compute_engine_metadata_client_;
317   std::unique_ptr<GcsDnsCache> dns_cache_;
318   GcsThrottle throttle_;
319 
320   using StatCache = ExpiringLRUCache<GcsFileStat>;
321   std::unique_ptr<StatCache> stat_cache_;
322 
323   using MatchingPathsCache = ExpiringLRUCache<std::vector<string>>;
324   std::unique_ptr<MatchingPathsCache> matching_paths_cache_;
325 
326   using BucketLocationCache = ExpiringLRUCache<string>;
327   std::unique_ptr<BucketLocationCache> bucket_location_cache_;
328   std::unordered_set<string> allowed_locations_;
329 
330   TimeoutConfig timeouts_;
331 
332   GcsStatsInterface* stats_ = nullptr;  // Not owned.
333 
334   /// The initial delay for exponential backoffs when retrying failed calls.
335   RetryConfig retry_config_;
336 
337   // Additional header material to be transmitted with all GCS requests
338   std::unique_ptr<std::pair<const string, const string>> additional_header_;
339 
340   TF_DISALLOW_COPY_AND_ASSIGN(GcsFileSystem);
341 };
342 
343 /// Google Cloud Storage implementation of a file system with retry on failures.
344 class RetryingGcsFileSystem : public RetryingFileSystem<GcsFileSystem> {
345  public:
RetryingGcsFileSystem()346   RetryingGcsFileSystem()
347       : RetryingFileSystem(std::unique_ptr<GcsFileSystem>(new GcsFileSystem),
348                            RetryConfig(100000 /* init_delay_time_us */)) {}
349 };
350 
351 }  // namespace tensorflow
352 
353 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_GCS_FILE_SYSTEM_H_
354