1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
18 
19 #include <functional>
20 #include <string>
21 #include <vector>
22 
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/random/random.h"
26 #include "tensorflow/core/platform/cloud/retrying_utils.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_system.h"
29 
30 namespace tensorflow {
31 
32 /// A wrapper to add retry logic to another file system.
33 template <typename Underlying>
34 class RetryingFileSystem : public FileSystem {
35  public:
RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,const RetryConfig & retry_config)36   RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37                      const RetryConfig& retry_config)
38       : base_file_system_(std::move(base_file_system)),
39         retry_config_(retry_config) {}
40 
41   Status NewRandomAccessFile(
42       const string& filename,
43       std::unique_ptr<RandomAccessFile>* result) override;
44 
45   Status NewWritableFile(const string& fname,
46                          std::unique_ptr<WritableFile>* result) override;
47 
48   Status NewAppendableFile(const string& fname,
49                            std::unique_ptr<WritableFile>* result) override;
50 
51   Status NewReadOnlyMemoryRegionFromFile(
52       const string& filename,
53       std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
54 
FileExists(const string & fname)55   Status FileExists(const string& fname) override {
56     return RetryingUtils::CallWithRetries(
57         [this, &fname]() { return base_file_system_->FileExists(fname); },
58         retry_config_);
59   }
60 
GetChildren(const string & dir,std::vector<string> * result)61   Status GetChildren(const string& dir, std::vector<string>* result) override {
62     return RetryingUtils::CallWithRetries(
63         [this, &dir, result]() {
64           return base_file_system_->GetChildren(dir, result);
65         },
66         retry_config_);
67   }
68 
GetMatchingPaths(const string & pattern,std::vector<string> * result)69   Status GetMatchingPaths(const string& pattern,
70                           std::vector<string>* result) override {
71     return RetryingUtils::CallWithRetries(
72         [this, &pattern, result]() {
73           return base_file_system_->GetMatchingPaths(pattern, result);
74         },
75         retry_config_);
76   }
77 
Stat(const string & fname,FileStatistics * stat)78   Status Stat(const string& fname, FileStatistics* stat) override {
79     return RetryingUtils::CallWithRetries(
80         [this, &fname, stat]() { return base_file_system_->Stat(fname, stat); },
81         retry_config_);
82   }
83 
DeleteFile(const string & fname)84   Status DeleteFile(const string& fname) override {
85     return RetryingUtils::DeleteWithRetries(
86         [this, &fname]() { return base_file_system_->DeleteFile(fname); },
87         retry_config_);
88   }
89 
CreateDir(const string & dirname)90   Status CreateDir(const string& dirname) override {
91     return RetryingUtils::CallWithRetries(
92         [this, &dirname]() { return base_file_system_->CreateDir(dirname); },
93         retry_config_);
94   }
95 
DeleteDir(const string & dirname)96   Status DeleteDir(const string& dirname) override {
97     return RetryingUtils::DeleteWithRetries(
98         [this, &dirname]() { return base_file_system_->DeleteDir(dirname); },
99         retry_config_);
100   }
101 
GetFileSize(const string & fname,uint64 * file_size)102   Status GetFileSize(const string& fname, uint64* file_size) override {
103     return RetryingUtils::CallWithRetries(
104         [this, &fname, file_size]() {
105           return base_file_system_->GetFileSize(fname, file_size);
106         },
107         retry_config_);
108   }
109 
RenameFile(const string & src,const string & target)110   Status RenameFile(const string& src, const string& target) override {
111     return RetryingUtils::CallWithRetries(
112         [this, &src, &target]() {
113           return base_file_system_->RenameFile(src, target);
114         },
115         retry_config_);
116   }
117 
IsDirectory(const string & dirname)118   Status IsDirectory(const string& dirname) override {
119     return RetryingUtils::CallWithRetries(
120         [this, &dirname]() { return base_file_system_->IsDirectory(dirname); },
121         retry_config_);
122   }
123 
DeleteRecursively(const string & dirname,int64 * undeleted_files,int64 * undeleted_dirs)124   Status DeleteRecursively(const string& dirname, int64* undeleted_files,
125                            int64* undeleted_dirs) override {
126     return RetryingUtils::DeleteWithRetries(
127         [this, &dirname, undeleted_files, undeleted_dirs]() {
128           return base_file_system_->DeleteRecursively(dirname, undeleted_files,
129                                                       undeleted_dirs);
130         },
131         retry_config_);
132   }
133 
FlushCaches()134   void FlushCaches() override { base_file_system_->FlushCaches(); }
135 
underlying()136   Underlying* underlying() const { return base_file_system_.get(); }
137 
138  private:
139   std::unique_ptr<Underlying> base_file_system_;
140   const RetryConfig retry_config_;
141 
142   TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
143 };
144 
145 namespace retrying_internals {
146 
147 class RetryingRandomAccessFile : public RandomAccessFile {
148  public:
RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,const RetryConfig & retry_config)149   RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
150                            const RetryConfig& retry_config)
151       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
152 
Name(StringPiece * result)153   Status Name(StringPiece* result) const override {
154     return base_file_->Name(result);
155   }
156 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch)157   Status Read(uint64 offset, size_t n, StringPiece* result,
158               char* scratch) const override {
159     return RetryingUtils::CallWithRetries(
160         [this, offset, n, result, scratch]() {
161           return base_file_->Read(offset, n, result, scratch);
162         },
163         retry_config_);
164   }
165 
166  private:
167   std::unique_ptr<RandomAccessFile> base_file_;
168   const RetryConfig retry_config_;
169 };
170 
171 class RetryingWritableFile : public WritableFile {
172  public:
RetryingWritableFile(std::unique_ptr<WritableFile> base_file,const RetryConfig & retry_config)173   RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
174                        const RetryConfig& retry_config)
175       : base_file_(std::move(base_file)), retry_config_(retry_config) {}
176 
~RetryingWritableFile()177   ~RetryingWritableFile() override {
178     // Makes sure the retrying version of Close() is called in the destructor.
179     Close().IgnoreError();
180   }
181 
Append(StringPiece data)182   Status Append(StringPiece data) override {
183     return RetryingUtils::CallWithRetries(
184         [this, &data]() { return base_file_->Append(data); }, retry_config_);
185   }
Close()186   Status Close() override {
187     return RetryingUtils::CallWithRetries(
188         [this]() { return base_file_->Close(); }, retry_config_);
189   }
Flush()190   Status Flush() override {
191     return RetryingUtils::CallWithRetries(
192         [this]() { return base_file_->Flush(); }, retry_config_);
193   }
Name(StringPiece * result)194   Status Name(StringPiece* result) const override {
195     return base_file_->Name(result);
196   }
Sync()197   Status Sync() override {
198     return RetryingUtils::CallWithRetries(
199         [this]() { return base_file_->Sync(); }, retry_config_);
200   }
Tell(int64 * position)201   Status Tell(int64* position) override {
202     return RetryingUtils::CallWithRetries(
203         [this, &position]() { return base_file_->Tell(position); },
204         retry_config_);
205   }
206 
207  private:
208   std::unique_ptr<WritableFile> base_file_;
209   const RetryConfig retry_config_;
210 };
211 
212 }  // namespace retrying_internals
213 
214 template <typename Underlying>
NewRandomAccessFile(const string & filename,std::unique_ptr<RandomAccessFile> * result)215 Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
216     const string& filename, std::unique_ptr<RandomAccessFile>* result) {
217   std::unique_ptr<RandomAccessFile> base_file;
218   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
219       [this, &filename, &base_file]() {
220         return base_file_system_->NewRandomAccessFile(filename, &base_file);
221       },
222       retry_config_));
223   result->reset(new retrying_internals::RetryingRandomAccessFile(
224       std::move(base_file), retry_config_));
225   return Status::OK();
226 }
227 
228 template <typename Underlying>
NewWritableFile(const string & filename,std::unique_ptr<WritableFile> * result)229 Status RetryingFileSystem<Underlying>::NewWritableFile(
230     const string& filename, std::unique_ptr<WritableFile>* result) {
231   std::unique_ptr<WritableFile> base_file;
232   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
233       [this, &filename, &base_file]() {
234         return base_file_system_->NewWritableFile(filename, &base_file);
235       },
236       retry_config_));
237   result->reset(new retrying_internals::RetryingWritableFile(
238       std::move(base_file), retry_config_));
239   return Status::OK();
240 }
241 
242 template <typename Underlying>
NewAppendableFile(const string & filename,std::unique_ptr<WritableFile> * result)243 Status RetryingFileSystem<Underlying>::NewAppendableFile(
244     const string& filename, std::unique_ptr<WritableFile>* result) {
245   std::unique_ptr<WritableFile> base_file;
246   TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
247       [this, &filename, &base_file]() {
248         return base_file_system_->NewAppendableFile(filename, &base_file);
249       },
250       retry_config_));
251   result->reset(new retrying_internals::RetryingWritableFile(
252       std::move(base_file), retry_config_));
253   return Status::OK();
254 }
255 
256 template <typename Underlying>
NewReadOnlyMemoryRegionFromFile(const string & filename,std::unique_ptr<ReadOnlyMemoryRegion> * result)257 Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
258     const string& filename, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
259   return RetryingUtils::CallWithRetries(
260       [this, &filename, result]() {
261         return base_file_system_->NewReadOnlyMemoryRegionFromFile(filename,
262                                                                   result);
263       },
264       retry_config_);
265 }
266 
267 }  // namespace tensorflow
268 
269 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_RETRYING_FILE_SYSTEM_H_
270