1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/platform/hadoop/hadoop_file_system.h"
17
18 #include <errno.h>
19
20 #include "tensorflow/core/lib/core/status.h"
21 #include "tensorflow/core/lib/io/path.h"
22 #include "tensorflow/core/lib/strings/strcat.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/file_system.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/mutex.h"
27 #include "tensorflow/core/platform/posix/error.h"
28 #include "third_party/hadoop/hdfs.h"
29
30 namespace tensorflow {
31
32 template <typename R, typename... Args>
BindFunc(void * handle,const char * name,std::function<R (Args...)> * func)33 Status BindFunc(void* handle, const char* name,
34 std::function<R(Args...)>* func) {
35 void* symbol_ptr = nullptr;
36 TF_RETURN_IF_ERROR(
37 Env::Default()->GetSymbolFromLibrary(handle, name, &symbol_ptr));
38 *func = reinterpret_cast<R (*)(Args...)>(symbol_ptr);
39 return Status::OK();
40 }
41
42 class LibHDFS {
43 public:
Load()44 static LibHDFS* Load() {
45 static LibHDFS* lib = []() -> LibHDFS* {
46 LibHDFS* lib = new LibHDFS;
47 lib->LoadAndBind();
48 return lib;
49 }();
50
51 return lib;
52 }
53
54 // The status, if any, from failure to load.
status()55 Status status() { return status_; }
56
57 std::function<hdfsFS(hdfsBuilder*)> hdfsBuilderConnect;
58 std::function<hdfsBuilder*()> hdfsNewBuilder;
59 std::function<void(hdfsBuilder*, const char*)> hdfsBuilderSetNameNode;
60 std::function<int(const char*, char**)> hdfsConfGetStr;
61 std::function<void(hdfsBuilder*, const char* kerbTicketCachePath)>
62 hdfsBuilderSetKerbTicketCachePath;
63 std::function<int(hdfsFS, hdfsFile)> hdfsCloseFile;
64 std::function<tSize(hdfsFS, hdfsFile, tOffset, void*, tSize)> hdfsPread;
65 std::function<tSize(hdfsFS, hdfsFile, const void*, tSize)> hdfsWrite;
66 std::function<int(hdfsFS, hdfsFile)> hdfsHFlush;
67 std::function<int(hdfsFS, hdfsFile)> hdfsHSync;
68 std::function<hdfsFile(hdfsFS, const char*, int, int, short, tSize)>
69 hdfsOpenFile;
70 std::function<int(hdfsFS, const char*)> hdfsExists;
71 std::function<hdfsFileInfo*(hdfsFS, const char*, int*)> hdfsListDirectory;
72 std::function<void(hdfsFileInfo*, int)> hdfsFreeFileInfo;
73 std::function<int(hdfsFS, const char*, int recursive)> hdfsDelete;
74 std::function<int(hdfsFS, const char*)> hdfsCreateDirectory;
75 std::function<hdfsFileInfo*(hdfsFS, const char*)> hdfsGetPathInfo;
76 std::function<int(hdfsFS, const char*, const char*)> hdfsRename;
77
78 private:
LoadAndBind()79 void LoadAndBind() {
80 auto TryLoadAndBind = [this](const char* name, void** handle) -> Status {
81 TF_RETURN_IF_ERROR(Env::Default()->LoadLibrary(name, handle));
82 #define BIND_HDFS_FUNC(function) \
83 TF_RETURN_IF_ERROR(BindFunc(*handle, #function, &function));
84
85 BIND_HDFS_FUNC(hdfsBuilderConnect);
86 BIND_HDFS_FUNC(hdfsNewBuilder);
87 BIND_HDFS_FUNC(hdfsBuilderSetNameNode);
88 BIND_HDFS_FUNC(hdfsConfGetStr);
89 BIND_HDFS_FUNC(hdfsBuilderSetKerbTicketCachePath);
90 BIND_HDFS_FUNC(hdfsCloseFile);
91 BIND_HDFS_FUNC(hdfsPread);
92 BIND_HDFS_FUNC(hdfsWrite);
93 BIND_HDFS_FUNC(hdfsHFlush);
94 BIND_HDFS_FUNC(hdfsHSync);
95 BIND_HDFS_FUNC(hdfsOpenFile);
96 BIND_HDFS_FUNC(hdfsExists);
97 BIND_HDFS_FUNC(hdfsListDirectory);
98 BIND_HDFS_FUNC(hdfsFreeFileInfo);
99 BIND_HDFS_FUNC(hdfsDelete);
100 BIND_HDFS_FUNC(hdfsCreateDirectory);
101 BIND_HDFS_FUNC(hdfsGetPathInfo);
102 BIND_HDFS_FUNC(hdfsRename);
103 #undef BIND_HDFS_FUNC
104 return Status::OK();
105 };
106
107 // libhdfs.so won't be in the standard locations. Use the path as specified
108 // in the libhdfs documentation.
109 #if defined(PLATFORM_WINDOWS)
110 const char* kLibHdfsDso = "hdfs.dll";
111 #else
112 const char* kLibHdfsDso = "libhdfs.so";
113 #endif
114 char* hdfs_home = getenv("HADOOP_HDFS_HOME");
115 if (hdfs_home == nullptr) {
116 status_ = errors::FailedPrecondition(
117 "Environment variable HADOOP_HDFS_HOME not set");
118 return;
119 }
120 string path = io::JoinPath(hdfs_home, "lib", "native", kLibHdfsDso);
121 status_ = TryLoadAndBind(path.c_str(), &handle_);
122 if (!status_.ok()) {
123 // try load libhdfs.so using dynamic loader's search path in case
124 // libhdfs.so is installed in non-standard location
125 status_ = TryLoadAndBind(kLibHdfsDso, &handle_);
126 }
127 }
128
129 Status status_;
130 void* handle_ = nullptr;
131 };
132
HadoopFileSystem()133 HadoopFileSystem::HadoopFileSystem() : hdfs_(LibHDFS::Load()) {}
134
~HadoopFileSystem()135 HadoopFileSystem::~HadoopFileSystem() {}
136
137 // We rely on HDFS connection caching here. The HDFS client calls
138 // org.apache.hadoop.fs.FileSystem.get(), which caches the connection
139 // internally.
Connect(StringPiece fname,hdfsFS * fs)140 Status HadoopFileSystem::Connect(StringPiece fname, hdfsFS* fs) {
141 TF_RETURN_IF_ERROR(hdfs_->status());
142
143 StringPiece scheme, namenode, path;
144 io::ParseURI(fname, &scheme, &namenode, &path);
145 const string nn = namenode.ToString();
146
147 hdfsBuilder* builder = hdfs_->hdfsNewBuilder();
148 if (scheme == "file") {
149 hdfs_->hdfsBuilderSetNameNode(builder, nullptr);
150 } else if (scheme == "viewfs") {
151 char* defaultFS = nullptr;
152 hdfs_->hdfsConfGetStr("fs.defaultFS", &defaultFS);
153 StringPiece defaultScheme, defaultCluster, defaultPath;
154 io::ParseURI(defaultFS, &defaultScheme, &defaultCluster, &defaultPath);
155
156 if (scheme != defaultScheme || namenode != defaultCluster) {
157 return errors::Unimplemented(
158 "viewfs is only supported as a fs.defaultFS.");
159 }
160 // The default NameNode configuration will be used (from the XML
161 // configuration files). See:
162 // https://github.com/tensorflow/tensorflow/blob/v1.0.0/third_party/hadoop/hdfs.h#L259
163 hdfs_->hdfsBuilderSetNameNode(builder, "default");
164 } else {
165 hdfs_->hdfsBuilderSetNameNode(builder, nn.c_str());
166 }
167 // KERB_TICKET_CACHE_PATH will be deleted in the future, Because KRB5CCNAME is
168 // the build in environment variable of Kerberos, so KERB_TICKET_CACHE_PATH
169 // and related code are unnecessary.
170 char* ticket_cache_path = getenv("KERB_TICKET_CACHE_PATH");
171 if (ticket_cache_path != nullptr) {
172 hdfs_->hdfsBuilderSetKerbTicketCachePath(builder, ticket_cache_path);
173 }
174 *fs = hdfs_->hdfsBuilderConnect(builder);
175 if (*fs == nullptr) {
176 return errors::NotFound(strerror(errno));
177 }
178 return Status::OK();
179 }
180
TranslateName(const string & name) const181 string HadoopFileSystem::TranslateName(const string& name) const {
182 StringPiece scheme, namenode, path;
183 io::ParseURI(name, &scheme, &namenode, &path);
184 return path.ToString();
185 }
186
187 class HDFSRandomAccessFile : public RandomAccessFile {
188 public:
HDFSRandomAccessFile(const string & filename,const string & hdfs_filename,LibHDFS * hdfs,hdfsFS fs,hdfsFile file)189 HDFSRandomAccessFile(const string& filename, const string& hdfs_filename,
190 LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
191 : filename_(filename),
192 hdfs_filename_(hdfs_filename),
193 hdfs_(hdfs),
194 fs_(fs),
195 file_(file) {}
196
~HDFSRandomAccessFile()197 ~HDFSRandomAccessFile() override {
198 if (file_ != nullptr) {
199 mutex_lock lock(mu_);
200 hdfs_->hdfsCloseFile(fs_, file_);
201 }
202 }
203
Read(uint64 offset,size_t n,StringPiece * result,char * scratch) const204 Status Read(uint64 offset, size_t n, StringPiece* result,
205 char* scratch) const override {
206 Status s;
207 char* dst = scratch;
208 bool eof_retried = false;
209 while (n > 0 && s.ok()) {
210 // We lock inside the loop rather than outside so we don't block other
211 // concurrent readers.
212 mutex_lock lock(mu_);
213 tSize r = hdfs_->hdfsPread(fs_, file_, static_cast<tOffset>(offset), dst,
214 static_cast<tSize>(n));
215 if (r > 0) {
216 dst += r;
217 n -= r;
218 offset += r;
219 } else if (!eof_retried && r == 0) {
220 // Always reopen the file upon reaching EOF to see if there's more data.
221 // If writers are streaming contents while others are concurrently
222 // reading, HDFS requires that we reopen the file to see updated
223 // contents.
224 //
225 // Fixes #5438
226 if (file_ != nullptr && hdfs_->hdfsCloseFile(fs_, file_) != 0) {
227 return IOError(filename_, errno);
228 }
229 file_ =
230 hdfs_->hdfsOpenFile(fs_, hdfs_filename_.c_str(), O_RDONLY, 0, 0, 0);
231 if (file_ == nullptr) {
232 return IOError(filename_, errno);
233 }
234 eof_retried = true;
235 } else if (eof_retried && r == 0) {
236 s = Status(error::OUT_OF_RANGE, "Read less bytes than requested");
237 } else if (errno == EINTR || errno == EAGAIN) {
238 // hdfsPread may return EINTR too. Just retry.
239 } else {
240 s = IOError(filename_, errno);
241 }
242 }
243 *result = StringPiece(scratch, dst - scratch);
244 return s;
245 }
246
247 private:
248 string filename_;
249 string hdfs_filename_;
250 LibHDFS* hdfs_;
251 hdfsFS fs_;
252
253 mutable mutex mu_;
254 mutable hdfsFile file_ GUARDED_BY(mu_);
255 };
256
NewRandomAccessFile(const string & fname,std::unique_ptr<RandomAccessFile> * result)257 Status HadoopFileSystem::NewRandomAccessFile(
258 const string& fname, std::unique_ptr<RandomAccessFile>* result) {
259 hdfsFS fs = nullptr;
260 TF_RETURN_IF_ERROR(Connect(fname, &fs));
261
262 hdfsFile file =
263 hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_RDONLY, 0, 0, 0);
264 if (file == nullptr) {
265 return IOError(fname, errno);
266 }
267 result->reset(
268 new HDFSRandomAccessFile(fname, TranslateName(fname), hdfs_, fs, file));
269 return Status::OK();
270 }
271
272 class HDFSWritableFile : public WritableFile {
273 public:
HDFSWritableFile(const string & fname,LibHDFS * hdfs,hdfsFS fs,hdfsFile file)274 HDFSWritableFile(const string& fname, LibHDFS* hdfs, hdfsFS fs, hdfsFile file)
275 : filename_(fname), hdfs_(hdfs), fs_(fs), file_(file) {}
276
~HDFSWritableFile()277 ~HDFSWritableFile() override {
278 if (file_ != nullptr) {
279 Close().IgnoreError();
280 }
281 }
282
Append(const StringPiece & data)283 Status Append(const StringPiece& data) override {
284 if (hdfs_->hdfsWrite(fs_, file_, data.data(),
285 static_cast<tSize>(data.size())) == -1) {
286 return IOError(filename_, errno);
287 }
288 return Status::OK();
289 }
290
Close()291 Status Close() override {
292 Status result;
293 if (hdfs_->hdfsCloseFile(fs_, file_) != 0) {
294 result = IOError(filename_, errno);
295 }
296 hdfs_ = nullptr;
297 fs_ = nullptr;
298 file_ = nullptr;
299 return result;
300 }
301
Flush()302 Status Flush() override {
303 if (hdfs_->hdfsHFlush(fs_, file_) != 0) {
304 return IOError(filename_, errno);
305 }
306 return Status::OK();
307 }
308
Sync()309 Status Sync() override {
310 if (hdfs_->hdfsHSync(fs_, file_) != 0) {
311 return IOError(filename_, errno);
312 }
313 return Status::OK();
314 }
315
316 private:
317 string filename_;
318 LibHDFS* hdfs_;
319 hdfsFS fs_;
320 hdfsFile file_;
321 };
322
NewWritableFile(const string & fname,std::unique_ptr<WritableFile> * result)323 Status HadoopFileSystem::NewWritableFile(
324 const string& fname, std::unique_ptr<WritableFile>* result) {
325 hdfsFS fs = nullptr;
326 TF_RETURN_IF_ERROR(Connect(fname, &fs));
327
328 hdfsFile file =
329 hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(), O_WRONLY, 0, 0, 0);
330 if (file == nullptr) {
331 return IOError(fname, errno);
332 }
333 result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
334 return Status::OK();
335 }
336
NewAppendableFile(const string & fname,std::unique_ptr<WritableFile> * result)337 Status HadoopFileSystem::NewAppendableFile(
338 const string& fname, std::unique_ptr<WritableFile>* result) {
339 hdfsFS fs = nullptr;
340 TF_RETURN_IF_ERROR(Connect(fname, &fs));
341
342 hdfsFile file = hdfs_->hdfsOpenFile(fs, TranslateName(fname).c_str(),
343 O_WRONLY | O_APPEND, 0, 0, 0);
344 if (file == nullptr) {
345 return IOError(fname, errno);
346 }
347 result->reset(new HDFSWritableFile(fname, hdfs_, fs, file));
348 return Status::OK();
349 }
350
NewReadOnlyMemoryRegionFromFile(const string & fname,std::unique_ptr<ReadOnlyMemoryRegion> * result)351 Status HadoopFileSystem::NewReadOnlyMemoryRegionFromFile(
352 const string& fname, std::unique_ptr<ReadOnlyMemoryRegion>* result) {
353 // hadoopReadZero() technically supports this call with the following
354 // caveats:
355 // - It only works up to 2 GB. We'd have to Stat() the file to ensure that
356 // it fits.
357 // - If not on the local filesystem, the entire file will be read, making
358 // it inefficient for callers that assume typical mmap() behavior.
359 return errors::Unimplemented("HDFS does not support ReadOnlyMemoryRegion");
360 }
361
FileExists(const string & fname)362 Status HadoopFileSystem::FileExists(const string& fname) {
363 hdfsFS fs = nullptr;
364 TF_RETURN_IF_ERROR(Connect(fname, &fs));
365 if (hdfs_->hdfsExists(fs, TranslateName(fname).c_str()) == 0) {
366 return Status::OK();
367 }
368 return errors::NotFound(fname, " not found.");
369 }
370
GetChildren(const string & dir,std::vector<string> * result)371 Status HadoopFileSystem::GetChildren(const string& dir,
372 std::vector<string>* result) {
373 result->clear();
374 hdfsFS fs = nullptr;
375 TF_RETURN_IF_ERROR(Connect(dir, &fs));
376
377 // hdfsListDirectory returns nullptr if the directory is empty. Do a separate
378 // check to verify the directory exists first.
379 FileStatistics stat;
380 TF_RETURN_IF_ERROR(Stat(dir, &stat));
381
382 int entries = 0;
383 hdfsFileInfo* info =
384 hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
385 if (info == nullptr) {
386 if (stat.is_directory) {
387 // Assume it's an empty directory.
388 return Status::OK();
389 }
390 return IOError(dir, errno);
391 }
392 for (int i = 0; i < entries; i++) {
393 result->push_back(io::Basename(info[i].mName).ToString());
394 }
395 hdfs_->hdfsFreeFileInfo(info, entries);
396 return Status::OK();
397 }
398
DeleteFile(const string & fname)399 Status HadoopFileSystem::DeleteFile(const string& fname) {
400 hdfsFS fs = nullptr;
401 TF_RETURN_IF_ERROR(Connect(fname, &fs));
402
403 if (hdfs_->hdfsDelete(fs, TranslateName(fname).c_str(),
404 /*recursive=*/0) != 0) {
405 return IOError(fname, errno);
406 }
407 return Status::OK();
408 }
409
CreateDir(const string & dir)410 Status HadoopFileSystem::CreateDir(const string& dir) {
411 hdfsFS fs = nullptr;
412 TF_RETURN_IF_ERROR(Connect(dir, &fs));
413
414 if (hdfs_->hdfsCreateDirectory(fs, TranslateName(dir).c_str()) != 0) {
415 return IOError(dir, errno);
416 }
417 return Status::OK();
418 }
419
DeleteDir(const string & dir)420 Status HadoopFileSystem::DeleteDir(const string& dir) {
421 hdfsFS fs = nullptr;
422 TF_RETURN_IF_ERROR(Connect(dir, &fs));
423
424 // Count the number of entries in the directory, and only delete if it's
425 // non-empty. This is consistent with the interface, but note that there's
426 // a race condition where a file may be added after this check, in which
427 // case the directory will still be deleted.
428 int entries = 0;
429 hdfsFileInfo* info =
430 hdfs_->hdfsListDirectory(fs, TranslateName(dir).c_str(), &entries);
431 if (info != nullptr) {
432 hdfs_->hdfsFreeFileInfo(info, entries);
433 }
434 // Due to HDFS bug HDFS-8407, we can't distinguish between an error and empty
435 // folder, expscially for Kerberos enable setup, EAGAIN is quite common when
436 // the call is actually successful. Check again by Stat.
437 if (info == nullptr && errno != 0) {
438 FileStatistics stat;
439 TF_RETURN_IF_ERROR(Stat(dir, &stat));
440 }
441
442 if (entries > 0) {
443 return errors::FailedPrecondition("Cannot delete a non-empty directory.");
444 }
445 if (hdfs_->hdfsDelete(fs, TranslateName(dir).c_str(),
446 /*recursive=*/1) != 0) {
447 return IOError(dir, errno);
448 }
449 return Status::OK();
450 }
451
GetFileSize(const string & fname,uint64 * size)452 Status HadoopFileSystem::GetFileSize(const string& fname, uint64* size) {
453 hdfsFS fs = nullptr;
454 TF_RETURN_IF_ERROR(Connect(fname, &fs));
455
456 hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
457 if (info == nullptr) {
458 return IOError(fname, errno);
459 }
460 *size = static_cast<uint64>(info->mSize);
461 hdfs_->hdfsFreeFileInfo(info, 1);
462 return Status::OK();
463 }
464
RenameFile(const string & src,const string & target)465 Status HadoopFileSystem::RenameFile(const string& src, const string& target) {
466 hdfsFS fs = nullptr;
467 TF_RETURN_IF_ERROR(Connect(src, &fs));
468
469 if (hdfs_->hdfsExists(fs, TranslateName(target).c_str()) == 0 &&
470 hdfs_->hdfsDelete(fs, TranslateName(target).c_str(),
471 /*recursive=*/0) != 0) {
472 return IOError(target, errno);
473 }
474
475 if (hdfs_->hdfsRename(fs, TranslateName(src).c_str(),
476 TranslateName(target).c_str()) != 0) {
477 return IOError(src, errno);
478 }
479 return Status::OK();
480 }
481
Stat(const string & fname,FileStatistics * stats)482 Status HadoopFileSystem::Stat(const string& fname, FileStatistics* stats) {
483 hdfsFS fs = nullptr;
484 TF_RETURN_IF_ERROR(Connect(fname, &fs));
485
486 hdfsFileInfo* info = hdfs_->hdfsGetPathInfo(fs, TranslateName(fname).c_str());
487 if (info == nullptr) {
488 return IOError(fname, errno);
489 }
490 stats->length = static_cast<int64>(info->mSize);
491 stats->mtime_nsec = static_cast<int64>(info->mLastMod) * 1e9;
492 stats->is_directory = info->mKind == kObjectKindDirectory;
493 hdfs_->hdfsFreeFileInfo(info, 1);
494 return Status::OK();
495 }
496
497 REGISTER_FILE_SYSTEM("hdfs", HadoopFileSystem);
498 REGISTER_FILE_SYSTEM("viewfs", HadoopFileSystem);
499
500 } // namespace tensorflow
501