1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
17 
18 #include <errno.h>
19 
20 #include "tensorflow/core/lib/core/status.h"
21 #include "tensorflow/core/lib/io/path.h"
22 #include "tensorflow/core/lib/strings/strcat.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/file_system.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/mutex.h"
27 #include "tensorflow/core/platform/posix/error.h"
28 #include "third_party/hadoop/hdfs.h"
29 
30 namespace tensorflow {
31 
32 template <typename R, typename... Args>
BindFunc(void * handle,const char * name,std::function<R (Args...)> * func)33 Status BindFunc(void* handle, const char* name,
34                 std::function<R(Args...)>* func) {
35   void* symbol_ptr = nullptr;
36   TF_RETURN_IF_ERROR(
37       Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr));
38   *func = reinterpret_cast<R (*)(Args...)>(symbol_ptr);
39   return Status::OK();
40 }
41 
42 class LibHDFS {
43  public:
Load()44   static LibHDFS* Load() {
45     static LibHDFS* lib = []() -> LibHDFS* {
46       LibHDFS* lib = new LibHDFS;
47       lib->LoadAndBind();
48       return lib;
49     }();
50 
51     return lib;
52   }
53 
54   // The status, if any, from failure to load.
status()55   Status status() { return status_; }
56 
57   std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect;
58   std::function<hdfsBuilder*()> hdfsNewBuilder;
59   std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode;
60   std::function<int(const char*, char**)> hdfsConfGetStr;
61   std::function<void(hdfsBuilder*, const char* kerbTicketCachePath)>
62       hdfsBuilderSetKerbTicketCachePath;
63   std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile;
64   std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread;
65   std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite;
66   std::function<int(hdfsFS, hdfsFile)> hdfsHFlush;
67   std::function<int(hdfsFS, hdfsFile)> hdfsHSync;
68   std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)>
69       hdfsOpenFile;
70   std::function<int(hdfsFS, const char*)> hdfsExists;
71   std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory;
72   std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo;
73   std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete;
74   std::function<int(hdfsFS, const char*)> hdfsCreateDirectory;
75   std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo;
76   std::function<int(hdfsFS, const char*, const char*)> hdfsRename;
77 
78  private:
LoadAndBind()79   void LoadAndBind() {
80     auto TryLoadAndBind = [this](const char* name, void** handle) -> Status {
81       TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle));
82 #define BIND_HDFS_FUNC(function) \
83   TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function));
84 
85       BIND_HDFS_FUNC(hdfsBuilderConnect);
86       BIND_HDFS_FUNC(hdfsNewBuilder);
87       BIND_HDFS_FUNC(hdfsBuilderSetNameNode);
88       BIND_HDFS_FUNC(hdfsConfGetStr);
89       BIND_HDFS_FUNC(hdfsBuilderSetKerbTicketCachePath);
90       BIND_HDFS_FUNC(hdfsCloseFile);
91       BIND_HDFS_FUNC(hdfsPread);
92       BIND_HDFS_FUNC(hdfsWrite);
93       BIND_HDFS_FUNC(hdfsHFlush);
94       BIND_HDFS_FUNC(hdfsHSync);
95       BIND_HDFS_FUNC(hdfsOpenFile);
96       BIND_HDFS_FUNC(hdfsExists);
97       BIND_HDFS_FUNC(hdfsListDirectory);
98       BIND_HDFS_FUNC(hdfsFreeFileInfo);
99       BIND_HDFS_FUNC(hdfsDelete);
100       BIND_HDFS_FUNC(hdfsCreateDirectory);
101       BIND_HDFS_FUNC(hdfsGetPathInfo);
102       BIND_HDFS_FUNC(hdfsRename);
103 #undef BIND_HDFS_FUNC
104       return Status::OK();
105     };
106 
107 // libhdfs.so won't be in the standard locations. Use the path as specified
108 // in the libhdfs documentation.
109 #if defined(PLATFORM_WINDOWS)
110     const char* kLibHdfsDso = "hdfs.dll";
111 #else
112     const char* kLibHdfsDso = "libhdfs.so";
113 #endif
114     char* hdfs_home = getenv("HADOOP_HDFS_HOME");
115     if (hdfs_home == nullptr) {
116       status_ = errors::FailedPrecondition(
117           "Environment variable HADOOP_HDFS_HOME not set");
118       return;
119     }
120     string path = io::JoinPath(hdfs_home, "lib", "native", kLibHdfsDso);
121     status_ = TryLoadAndBind(path.c_str(), &handle_);
122     if (!status_.ok()) {
123       // try load libhdfs.so using dynamic loader's search path in case
124       // libhdfs.so is installed in non-standard location
125       status_ = TryLoadAndBind(kLibHdfsDso, &handle_);
126     }
127   }
128 
129   Status status_;
130   void* handle_ = nullptr;
131 };
132 
HadoopFileSystem()133 HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {}
134 
~HadoopFileSystem()135 HadoopFileSystem::~HadoopFileSystem() {}
136 
137 // We rely on HDFS connection caching here. The HDFS client calls
138 // org.apache.hadoop.fs.FileSystem.get(), which caches the connection
139 // internally.
Connect(StringPiece fname,hdfsFS * fs)140 Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) {
141   TF_RETURN_IF_ERROR(hdfs_->status());
142 
143   StringPiece scheme, namenode, path;
144   io::ParseURI(fname, &scheme, &namenode, &path);
145   const string nn = namenode.ToString();
146 
147   hdfsBuilder* builder = hdfs_->hdfsNewBuilder();
148   if (scheme == "file") {
149     hdfs_->hdfsBuilderSetNameNode(builder, nullptr);
150   } else if (scheme == "viewfs") {
151     char* defaultFS = nullptr;
152     hdfs_->hdfsConfGetStr("fs.defaultFS", &defaultFS);
153     StringPiece defaultScheme, defaultCluster, defaultPath;
154     io::ParseURI(defaultFS, &defaultScheme, &defaultCluster, &defaultPath);
155 
156     if (scheme != defaultScheme || namenode != defaultCluster) {
157       return errors::Unimplemented(
158           "viewfs is only supported as a fs.defaultFS.");
159     }
160     // The default NameNode configuration will be used (from the XML
161     // configuration files). See:
162     // https://github.com/tensorflow/tensorflow/blob/v1.0.0/third_party/hadoop/hdfs.h#L259
163     hdfs_->hdfsBuilderSetNameNode(builder, "default");
164   } else {
165     hdfs_->hdfsBuilderSetNameNode(builder, nn.c_str());
166   }
167   // KERB_TICKET_CACHE_PATH will be deleted in the future, Because KRB5CCNAME is
168   // the build in environment variable of Kerberos, so KERB_TICKET_CACHE_PATH
169   // and related code are unnecessary.
170   char* ticket_cache_path = getenv("KERB_TICKET_CACHE_PATH");
171   if (ticket_cache_path != nullptr) {
172     hdfs_->hdfsBuilderSetKerbTicketCachePath(builder, ticket_cache_path);
173   }
174   *fs = hdfs_->hdfsBuilderConnect(builder);
175   if (*fs == nullptr) {
176     return errors::NotFound(strerror(errno));
177   }
178   return Status::OK();
179 }
180 
TranslateName(const string & name) const181 string HadoopFileSystem::TranslateName(const string& name) const {
182   StringPiece scheme, namenode, path;
183   io::ParseURI(name, &scheme, &namenode, &path);
184   return path.ToString();
185 }
186 
187 class HDFSRandomAccessFile : public RandomAccessFile {
188  public:
HDFSRandomAccessFile(const string & filename,const string & hdfs_filename,LibHDFS * hdfs,hdfsFS fs,hdfsFile file)189   HDFSRandomAccessFile(const string& filename, const string& hdfs_filename,
190                        LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
191       : filename_(filename),
192         hdfs_filename_(hdfs_filename),
193         hdfs_(hdfs),
194         fs_(fs),
195         file_(file) {}
196 
~HDFSRandomAccessFile()197   ~HDFSRandomAccessFile() override {
198     if (file_ != nullptr) {
199       mutex_lock lock(mu_);
200       hdfs_->hdfsCloseFile(fs_, file_);
201     }
202   }
203 
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const204   Status Read(uint64 offset, size_t n, StringPiece* result,
205               char* scratch) const override {
206     Status s;
207     char* dst = scratch;
208     bool eof_retried = false;
209     while (n > 0 && s.ok()) {
210       // We lock inside the loop rather than outside so we don't block other
211       // concurrent readers.
212       mutex_lock lock(mu_);
213       tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst,
214                                  static_cast<tSize>(n));
215       if (r > 0) {
216         dst += r;
217         n -= r;
218         offset += r;
219       } else if (!eof_retried && r == 0) {
220         // Always reopen the file upon reaching EOF to see if there's more data.
221         // If writers are streaming contents while others are concurrently
222         // reading, HDFS requires that we reopen the file to see updated
223         // contents.
224         //
225         // Fixes #5438
226         if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) {
227           return IOError(filename_, errno);
228         }
229         file_ =
230             hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0);
231         if (file_ == nullptr) {
232           return IOError(filename_, errno);
233         }
234         eof_retried = true;
235       } else if (eof_retried && r == 0) {
236         s = Status(error::OUT_OF_RANGE, "Read less bytes than requested");
237       } else if (errno == EINTR || errno == EAGAIN) {
238         // hdfsPread may return EINTR too. Just retry.
239       } else {
240         s = IOError(filename_, errno);
241       }
242     }
243     *result = StringPiece(scratch, dst - scratch);
244     return s;
245   }
246 
247  private:
248   string filename_;
249   string hdfs_filename_;
250   LibHDFS* hdfs_;
251   hdfsFS fs_;
252 
253   mutable mutex mu_;
254   mutable hdfsFile file_ GUARDED_BY(mu_);
255 };
256 
NewRandomAccessFile(const string & fname,std::unique_ptr<RandomAccessFile> * result)257 Status HadoopFileSystem::NewRandomAccessFile(
258     const string& fname, std::unique_ptr<RandomAccessFile>* result) {
259   hdfsFS fs = nullptr;
260   TF_RETURN_IF_ERROR(Connect(fname, &fs));
261 
262   hdfsFile file =
263       hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0);
264   if (file == nullptr) {
265     return IOError(fname, errno);
266   }
267   result->reset(
268       new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file));
269   return Status::OK();
270 }
271 
272 class HDFSWritableFile : public WritableFile {
273  public:
HDFSWritableFile(const string & fname,LibHDFS * hdfs,hdfsFS fs,hdfsFile file)274   HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
275       : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
276 
~HDFSWritableFile()277   ~HDFSWritableFile() override {
278     if (file_ != nullptr) {
279       Close().IgnoreError();
280     }
281   }
282 
Append(const StringPiece & data)283   Status Append(const StringPiece& data) override {
284     if (hdfs_->hdfsWrite(fs_, file_, data.data(),
285                          static_cast<tSize>(data.size())) == -1) {
286       return IOError(filename_, errno);
287     }
288     return Status::OK();
289   }
290 
Close()291   Status Close() override {
292     Status result;
293     if (hdfs_->hdfsCloseFile(fs_, file_) != 0) {
294       result = IOError(filename_, errno);
295     }
296     hdfs_ = nullptr;
297     fs_ = nullptr;
298     file_ = nullptr;
299     return result;
300   }
301 
Flush()302   Status Flush() override {
303     if (hdfs_->hdfsHFlush(fs_, file_) != 0) {
304       return IOError(filename_, errno);
305     }
306     return Status::OK();
307   }
308 
Sync()309   Status Sync() override {
310     if (hdfs_->hdfsHSync(fs_, file_) != 0) {
311       return IOError(filename_, errno);
312     }
313     return Status::OK();
314   }
315 
316  private:
317   string filename_;
318   LibHDFS* hdfs_;
319   hdfsFS fs_;
320   hdfsFile file_;
321 };
322 
NewWritableFile(const string & fname,std::unique_ptr<WritableFile> * result)323 Status HadoopFileSystem::NewWritableFile(
324     const string& fname, std::unique_ptr<WritableFile>* result) {
325   hdfsFS fs = nullptr;
326   TF_RETURN_IF_ERROR(Connect(fname, &fs));
327 
328   hdfsFile file =
329       hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0);
330   if (file == nullptr) {
331     return IOError(fname, errno);
332   }
333   result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
334   return Status::OK();
335 }
336 
NewAppendableFile(const string & fname,std::unique_ptr<WritableFile> * result)337 Status HadoopFileSystem::NewAppendableFile(
338     const string& fname, std::unique_ptr<WritableFile>* result) {
339   hdfsFS fs = nullptr;
340   TF_RETURN_IF_ERROR(Connect(fname, &fs));
341 
342   hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(),
343                                       O_WRONLY | O_APPEND, 0, 0, 0);
344   if (file == nullptr) {
345     return IOError(fname, errno);
346   }
347   result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
348   return Status::OK();
349 }
350 
NewReadOnlyMemoryRegionFromFile(const string & fname,std::unique_ptr<ReadOnlyMemoryRegion> * result)351 Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile(
352     const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
353   // hadoopReadZero() technically supports this call with the following
354   // caveats:
355   // - It only works up to 2 GB. We'd have to Stat() the file to ensure that
356   //   it fits.
357   // - If not on the local filesystem, the entire file will be read, making
358   //   it inefficient for callers that assume typical mmap() behavior.
359   return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion");
360 }
361 
FileExists(const string & fname)362 Status HadoopFileSystem::FileExists(const string& fname) {
363   hdfsFS fs = nullptr;
364   TF_RETURN_IF_ERROR(Connect(fname, &fs));
365   if (hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0) {
366     return Status::OK();
367   }
368   return errors::NotFound(fname, " not found.");
369 }
370 
GetChildren(const string & dir,std::vector<string> * result)371 Status HadoopFileSystem::GetChildren(const string& dir,
372                                      std::vector<string>* result) {
373   result->clear();
374   hdfsFS fs = nullptr;
375   TF_RETURN_IF_ERROR(Connect(dir, &fs));
376 
377   // hdfsListDirectory returns nullptr if the directory is empty. Do a separate
378   // check to verify the directory exists first.
379   FileStatistics stat;
380   TF_RETURN_IF_ERROR(Stat(dir, &stat));
381 
382   int entries = 0;
383   hdfsFileInfo* info =
384       hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
385   if (info == nullptr) {
386     if (stat.is_directory) {
387       // Assume it's an empty directory.
388       return Status::OK();
389     }
390     return IOError(dir, errno);
391   }
392   for (int i = 0; i < entries; i++) {
393     result->push_back(io::Basename(info[i].mName).ToString());
394   }
395   hdfs_->hdfsFreeFileInfo(info, entries);
396   return Status::OK();
397 }
398 
DeleteFile(const string & fname)399 Status HadoopFileSystem::DeleteFile(const string& fname) {
400   hdfsFS fs = nullptr;
401   TF_RETURN_IF_ERROR(Connect(fname, &fs));
402 
403   if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(),
404                         /*recursive=*/0) != 0) {
405     return IOError(fname, errno);
406   }
407   return Status::OK();
408 }
409 
CreateDir(const string & dir)410 Status HadoopFileSystem::CreateDir(const string& dir) {
411   hdfsFS fs = nullptr;
412   TF_RETURN_IF_ERROR(Connect(dir, &fs));
413 
414   if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) {
415     return IOError(dir, errno);
416   }
417   return Status::OK();
418 }
419 
DeleteDir(const string & dir)420 Status HadoopFileSystem::DeleteDir(const string& dir) {
421   hdfsFS fs = nullptr;
422   TF_RETURN_IF_ERROR(Connect(dir, &fs));
423 
424   // Count the number of entries in the directory, and only delete if it's
425   // non-empty. This is consistent with the interface, but note that there's
426   // a race condition where a file may be added after this check, in which
427   // case the directory will still be deleted.
428   int entries = 0;
429   hdfsFileInfo* info =
430       hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
431   if (info != nullptr) {
432     hdfs_->hdfsFreeFileInfo(info, entries);
433   }
434   // Due to HDFS bug HDFS-8407, we can't distinguish between an error and empty
435   // folder, expscially for Kerberos enable setup, EAGAIN is quite common when
436   // the call is actually successful. Check again by Stat.
437   if (info == nullptr && errno != 0) {
438     FileStatistics stat;
439     TF_RETURN_IF_ERROR(Stat(dir, &stat));
440   }
441 
442   if (entries > 0) {
443     return errors::FailedPrecondition("Cannot delete a non-empty directory.");
444   }
445   if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(),
446                         /*recursive=*/1) != 0) {
447     return IOError(dir, errno);
448   }
449   return Status::OK();
450 }
451 
GetFileSize(const string & fname,uint64 * size)452 Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) {
453   hdfsFS fs = nullptr;
454   TF_RETURN_IF_ERROR(Connect(fname, &fs));
455 
456   hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
457   if (info == nullptr) {
458     return IOError(fname, errno);
459   }
460   *size = static_cast<uint64>(info->mSize);
461   hdfs_->hdfsFreeFileInfo(info, 1);
462   return Status::OK();
463 }
464 
RenameFile(const string & src,const string & target)465 Status HadoopFileSystem::RenameFile(const string& src, const string& target) {
466   hdfsFS fs = nullptr;
467   TF_RETURN_IF_ERROR(Connect(src, &fs));
468 
469   if (hdfs_->hdfsExists(fs, TranslateName(target).c_str()) == 0 &&
470       hdfs_->hdfsDelete(fs, TranslateName(target).c_str(),
471                         /*recursive=*/0) != 0) {
472     return IOError(target, errno);
473   }
474 
475   if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(),
476                         TranslateName(target).c_str()) != 0) {
477     return IOError(src, errno);
478   }
479   return Status::OK();
480 }
481 
Stat(const string & fname,FileStatistics * stats)482 Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) {
483   hdfsFS fs = nullptr;
484   TF_RETURN_IF_ERROR(Connect(fname, &fs));
485 
486   hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
487   if (info == nullptr) {
488     return IOError(fname, errno);
489   }
490   stats->length = static_cast<int64>(info->mSize);
491   stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9;
492   stats->is_directory = info->mKind == kObjectKindDirectory;
493   hdfs_->hdfsFreeFileInfo(info, 1);
494   return Status::OK();
495 }
496 
497 REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem);
498 REGISTER_FILE_SYSTEM("viewfs", HadoopFileSystem);
499 
500 }  // namespace tensorflow
501