1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include <memory>
17 #include <string>
18 #include <vector>
19 
20 #include "pybind11/pybind11.h"
21 #include "pybind11/stl.h"
22 #include "tensorflow/core/lib/core/error_codes.pb.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/io/buffered_inputstream.h"
26 #include "tensorflow/core/lib/io/random_inputstream.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/file_system.h"
30 #include "tensorflow/core/platform/stringpiece.h"
31 #include "tensorflow/core/platform/tstring.h"
32 #include "tensorflow/python/lib/core/pybind11_absl.h"
33 #include "tensorflow/python/lib/core/pybind11_status.h"
34 
35 namespace tensorflow {
36 struct PyTransactionToken {
37   TransactionToken* token_;
38 };
39 
TokenFromPyToken(PyTransactionToken * t)40 inline TransactionToken* TokenFromPyToken(PyTransactionToken* t) {
41   return (t ? t->token_ : nullptr);
42 }
43 }  // namespace tensorflow
44 
45 namespace {
46 namespace py = pybind11;
47 
PYBIND11_MODULE(_pywrap_file_io,m)48 PYBIND11_MODULE(_pywrap_file_io, m) {
49   using tensorflow::PyTransactionToken;
50   using tensorflow::TransactionToken;
51   py::class_<PyTransactionToken>(m, "TransactionToken")
52       .def("__repr__", [](const PyTransactionToken* t) {
53         if (t->token_) {
54           return std::string(t->token_->owner->DecodeTransaction(t->token_));
55         }
56         return std::string("Invalid token!");
57       });
58 
59   m.def(
60       "FileExists",
61       [](const std::string& filename, PyTransactionToken* token) {
62         tensorflow::Status status;
63         {
64           py::gil_scoped_release release;
65           status = tensorflow::Env::Default()->FileExists(filename);
66         }
67         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
68       },
69       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
70   m.def(
71       "DeleteFile",
72       [](const std::string& filename, PyTransactionToken* token) {
73         py::gil_scoped_release release;
74         tensorflow::Status status =
75             tensorflow::Env::Default()->DeleteFile(filename);
76         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
77       },
78       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
79   m.def(
80       "ReadFileToString",
81       [](const std::string& filename, PyTransactionToken* token) {
82         std::string data;
83         py::gil_scoped_release release;
84         const auto status =
85             ReadFileToString(tensorflow::Env::Default(), filename, &data);
86         pybind11::gil_scoped_acquire acquire;
87         tensorflow::MaybeRaiseRegisteredFromStatus(status);
88         return py::bytes(data);
89       },
90       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
91   m.def(
92       "WriteStringToFile",
93       [](const std::string& filename, tensorflow::StringPiece data,
94          PyTransactionToken* token) {
95         py::gil_scoped_release release;
96         const auto status =
97             WriteStringToFile(tensorflow::Env::Default(), filename, data);
98         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
99       },
100       py::arg("filename"), py::arg("data"),
101       py::arg("token") = (PyTransactionToken*)nullptr);
102   m.def(
103       "GetChildren",
104       [](const std::string& dirname, PyTransactionToken* token) {
105         std::vector<std::string> results;
106         py::gil_scoped_release release;
107         const auto status =
108             tensorflow::Env::Default()->GetChildren(dirname, &results);
109         pybind11::gil_scoped_acquire acquire;
110         tensorflow::MaybeRaiseRegisteredFromStatus(status);
111         return results;
112       },
113       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
114   m.def(
115       "GetMatchingFiles",
116       [](const std::string& pattern, PyTransactionToken* token) {
117         std::vector<std::string> results;
118         py::gil_scoped_release release;
119         const auto status =
120             tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
121         pybind11::gil_scoped_acquire acquire;
122         tensorflow::MaybeRaiseRegisteredFromStatus(status);
123         return results;
124       },
125       py::arg("pattern"), py::arg("token") = (PyTransactionToken*)nullptr);
126   m.def(
127       "CreateDir",
128       [](const std::string& dirname, PyTransactionToken* token) {
129         py::gil_scoped_release release;
130         const auto status = tensorflow::Env::Default()->CreateDir(dirname);
131         if (tensorflow::errors::IsAlreadyExists(status)) {
132           return;
133         }
134         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
135       },
136       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
137   m.def(
138       "RecursivelyCreateDir",
139       [](const std::string& dirname, PyTransactionToken* token) {
140         py::gil_scoped_release release;
141         const auto status =
142             tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
143         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
144       },
145       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
146   m.def(
147       "CopyFile",
148       [](const std::string& src, const std::string& target, bool overwrite,
149          PyTransactionToken* token) {
150         py::gil_scoped_release release;
151         auto* env = tensorflow::Env::Default();
152         tensorflow::Status status;
153         if (!overwrite && env->FileExists(target).ok()) {
154           status = tensorflow::errors::AlreadyExists("file already exists");
155         } else {
156           status = env->CopyFile(src, target);
157         }
158         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
159       },
160       py::arg("src"), py::arg("target"), py::arg("overwrite"),
161       py::arg("token") = (PyTransactionToken*)nullptr);
162   m.def(
163       "RenameFile",
164       [](const std::string& src, const std::string& target, bool overwrite,
165          PyTransactionToken* token) {
166         py::gil_scoped_release release;
167         auto* env = tensorflow::Env::Default();
168         tensorflow::Status status;
169         if (!overwrite && env->FileExists(target).ok()) {
170           status = tensorflow::errors::AlreadyExists("file already exists");
171         } else {
172           status = env->RenameFile(src, target);
173         }
174         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
175       },
176       py::arg("src"), py::arg("target"), py::arg("overwrite"),
177       py::arg("token") = (PyTransactionToken*)nullptr);
178   m.def(
179       "DeleteRecursively",
180       [](const std::string& dirname, PyTransactionToken* token) {
181         py::gil_scoped_release release;
182         tensorflow::int64 undeleted_files;
183         tensorflow::int64 undeleted_dirs;
184         auto status = tensorflow::Env::Default()->DeleteRecursively(
185             dirname, &undeleted_files, &undeleted_dirs);
186         if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
187           status = tensorflow::errors::PermissionDenied(
188               "could not fully delete dir");
189         }
190         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
191       },
192       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
193   m.def(
194       "IsDirectory",
195       [](const std::string& dirname, PyTransactionToken* token) {
196         py::gil_scoped_release release;
197         const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
198         // FAILED_PRECONDITION response means path exists but isn't a dir.
199         if (tensorflow::errors::IsFailedPrecondition(status)) {
200           return false;
201         }
202 
203         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
204         return true;
205       },
206       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
207   m.def("HasAtomicMove", [](const std::string& path) {
208     py::gil_scoped_release release;
209     bool has_atomic_move;
210     const auto status =
211         tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
212     tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
213     return has_atomic_move;
214   });
215 
216   py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
217       .def_readonly("length", &tensorflow::FileStatistics::length)
218       .def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)
219       .def_readonly("is_directory", &tensorflow::FileStatistics::is_directory);
220 
221   m.def(
222       "Stat",
223       [](const std::string& filename, PyTransactionToken* token) {
224         py::gil_scoped_release release;
225         std::unique_ptr<tensorflow::FileStatistics> self(
226             new tensorflow::FileStatistics);
227         const auto status =
228             tensorflow::Env::Default()->Stat(filename, self.get());
229         py::gil_scoped_acquire acquire;
230         tensorflow::MaybeRaiseRegisteredFromStatus(status);
231         return self.release();
232       },
233       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
234 
235   using tensorflow::WritableFile;
236   py::class_<WritableFile>(m, "WritableFile")
237       .def(py::init([](const std::string& filename, const std::string& mode,
238                        PyTransactionToken* token) {
239              py::gil_scoped_release release;
240              auto* env = tensorflow::Env::Default();
241              std::unique_ptr<WritableFile> self;
242              const auto status = mode.find('a') == std::string::npos
243                                      ? env->NewWritableFile(filename, &self)
244                                      : env->NewAppendableFile(filename, &self);
245              py::gil_scoped_acquire acquire;
246              tensorflow::MaybeRaiseRegisteredFromStatus(status);
247              return self.release();
248            }),
249            py::arg("filename"), py::arg("mode"),
250            py::arg("token") = (PyTransactionToken*)nullptr)
251       .def("append",
252            [](WritableFile* self, tensorflow::StringPiece data) {
253              const auto status = self->Append(data);
254              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
255            })
256       // TODO(slebedev): Make WritableFile::Tell const and change self
257       // to be a reference.
258       .def("tell",
259            [](WritableFile* self) {
260              tensorflow::int64 pos = -1;
261              py::gil_scoped_release release;
262              const auto status = self->Tell(&pos);
263              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
264              return pos;
265            })
266       .def("flush",
267            [](WritableFile* self) {
268              py::gil_scoped_release release;
269              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Flush());
270            })
271       .def("close", [](WritableFile* self) {
272         py::gil_scoped_release release;
273         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Close());
274       });
275 
276   using tensorflow::io::BufferedInputStream;
277   py::class_<BufferedInputStream>(m, "BufferedInputStream")
278       .def(py::init([](const std::string& filename, size_t buffer_size,
279                        PyTransactionToken* token) {
280              py::gil_scoped_release release;
281              std::unique_ptr<tensorflow::RandomAccessFile> file;
282              const auto status =
283                  tensorflow::Env::Default()->NewRandomAccessFile(filename,
284                                                                  &file);
285              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
286              std::unique_ptr<tensorflow::io::RandomAccessInputStream>
287                  input_stream(new tensorflow::io::RandomAccessInputStream(
288                      file.release(),
289                      /*owns_file=*/true));
290              py::gil_scoped_acquire acquire;
291              return new BufferedInputStream(input_stream.release(), buffer_size,
292                                             /*owns_input_stream=*/true);
293            }),
294            py::arg("filename"), py::arg("buffer_size"),
295            py::arg("token") = (PyTransactionToken*)nullptr)
296       .def("read",
297            [](BufferedInputStream* self, tensorflow::int64 bytes_to_read) {
298              py::gil_scoped_release release;
299              tensorflow::tstring result;
300              const auto status = self->ReadNBytes(bytes_to_read, &result);
301              if (!status.ok() && !tensorflow::errors::IsOutOfRange(status)) {
302                result.clear();
303                tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
304              }
305              py::gil_scoped_acquire acquire;
306              return py::bytes(result);
307            })
308       .def("readline",
309            [](BufferedInputStream* self) {
310              py::gil_scoped_release release;
311              auto output = self->ReadLineAsString();
312              py::gil_scoped_acquire acquire;
313              return py::bytes(output);
314            })
315       .def("seek",
316            [](BufferedInputStream* self, tensorflow::int64 pos) {
317              py::gil_scoped_release release;
318              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Seek(pos));
319            })
320       .def("tell", [](BufferedInputStream* self) {
321         py::gil_scoped_release release;
322         return self->Tell();
323       });
324 }
325 }  // namespace
326