1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include <memory>
17 #include <string>
18 #include <vector>
19
20 #include "pybind11/pybind11.h"
21 #include "pybind11/stl.h"
22 #include "tensorflow/core/lib/core/error_codes.pb.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/io/buffered_inputstream.h"
26 #include "tensorflow/core/lib/io/random_inputstream.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/file_system.h"
30 #include "tensorflow/core/platform/stringpiece.h"
31 #include "tensorflow/core/platform/tstring.h"
32 #include "tensorflow/python/lib/core/pybind11_absl.h"
33 #include "tensorflow/python/lib/core/pybind11_status.h"
34
35 namespace tensorflow {
36 struct PyTransactionToken {
37 TransactionToken* token_;
38 };
39
TokenFromPyToken(PyTransactionToken * t)40 inline TransactionToken* TokenFromPyToken(PyTransactionToken* t) {
41 return (t ? t->token_ : nullptr);
42 }
43 } // namespace tensorflow
44
45 namespace {
46 namespace py = pybind11;
47
PYBIND11_MODULE(_pywrap_file_io,m)48 PYBIND11_MODULE(_pywrap_file_io, m) {
49 using tensorflow::PyTransactionToken;
50 using tensorflow::TransactionToken;
51 py::class_<PyTransactionToken>(m, "TransactionToken")
52 .def("__repr__", [](const PyTransactionToken* t) {
53 if (t->token_) {
54 return std::string(t->token_->owner->DecodeTransaction(t->token_));
55 }
56 return std::string("Invalid token!");
57 });
58
59 m.def(
60 "FileExists",
61 [](const std::string& filename, PyTransactionToken* token) {
62 tensorflow::Status status;
63 {
64 py::gil_scoped_release release;
65 status = tensorflow::Env::Default()->FileExists(filename);
66 }
67 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
68 },
69 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
70 m.def(
71 "DeleteFile",
72 [](const std::string& filename, PyTransactionToken* token) {
73 py::gil_scoped_release release;
74 tensorflow::Status status =
75 tensorflow::Env::Default()->DeleteFile(filename);
76 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
77 },
78 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
79 m.def(
80 "ReadFileToString",
81 [](const std::string& filename, PyTransactionToken* token) {
82 std::string data;
83 py::gil_scoped_release release;
84 const auto status =
85 ReadFileToString(tensorflow::Env::Default(), filename, &data);
86 pybind11::gil_scoped_acquire acquire;
87 tensorflow::MaybeRaiseRegisteredFromStatus(status);
88 return py::bytes(data);
89 },
90 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
91 m.def(
92 "WriteStringToFile",
93 [](const std::string& filename, tensorflow::StringPiece data,
94 PyTransactionToken* token) {
95 py::gil_scoped_release release;
96 const auto status =
97 WriteStringToFile(tensorflow::Env::Default(), filename, data);
98 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
99 },
100 py::arg("filename"), py::arg("data"),
101 py::arg("token") = (PyTransactionToken*)nullptr);
102 m.def(
103 "GetChildren",
104 [](const std::string& dirname, PyTransactionToken* token) {
105 std::vector<std::string> results;
106 py::gil_scoped_release release;
107 const auto status =
108 tensorflow::Env::Default()->GetChildren(dirname, &results);
109 pybind11::gil_scoped_acquire acquire;
110 tensorflow::MaybeRaiseRegisteredFromStatus(status);
111 return results;
112 },
113 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
114 m.def(
115 "GetMatchingFiles",
116 [](const std::string& pattern, PyTransactionToken* token) {
117 std::vector<std::string> results;
118 py::gil_scoped_release release;
119 const auto status =
120 tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
121 pybind11::gil_scoped_acquire acquire;
122 tensorflow::MaybeRaiseRegisteredFromStatus(status);
123 return results;
124 },
125 py::arg("pattern"), py::arg("token") = (PyTransactionToken*)nullptr);
126 m.def(
127 "CreateDir",
128 [](const std::string& dirname, PyTransactionToken* token) {
129 py::gil_scoped_release release;
130 const auto status = tensorflow::Env::Default()->CreateDir(dirname);
131 if (tensorflow::errors::IsAlreadyExists(status)) {
132 return;
133 }
134 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
135 },
136 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
137 m.def(
138 "RecursivelyCreateDir",
139 [](const std::string& dirname, PyTransactionToken* token) {
140 py::gil_scoped_release release;
141 const auto status =
142 tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
143 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
144 },
145 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
146 m.def(
147 "CopyFile",
148 [](const std::string& src, const std::string& target, bool overwrite,
149 PyTransactionToken* token) {
150 py::gil_scoped_release release;
151 auto* env = tensorflow::Env::Default();
152 tensorflow::Status status;
153 if (!overwrite && env->FileExists(target).ok()) {
154 status = tensorflow::errors::AlreadyExists("file already exists");
155 } else {
156 status = env->CopyFile(src, target);
157 }
158 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
159 },
160 py::arg("src"), py::arg("target"), py::arg("overwrite"),
161 py::arg("token") = (PyTransactionToken*)nullptr);
162 m.def(
163 "RenameFile",
164 [](const std::string& src, const std::string& target, bool overwrite,
165 PyTransactionToken* token) {
166 py::gil_scoped_release release;
167 auto* env = tensorflow::Env::Default();
168 tensorflow::Status status;
169 if (!overwrite && env->FileExists(target).ok()) {
170 status = tensorflow::errors::AlreadyExists("file already exists");
171 } else {
172 status = env->RenameFile(src, target);
173 }
174 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
175 },
176 py::arg("src"), py::arg("target"), py::arg("overwrite"),
177 py::arg("token") = (PyTransactionToken*)nullptr);
178 m.def(
179 "DeleteRecursively",
180 [](const std::string& dirname, PyTransactionToken* token) {
181 py::gil_scoped_release release;
182 tensorflow::int64 undeleted_files;
183 tensorflow::int64 undeleted_dirs;
184 auto status = tensorflow::Env::Default()->DeleteRecursively(
185 dirname, &undeleted_files, &undeleted_dirs);
186 if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
187 status = tensorflow::errors::PermissionDenied(
188 "could not fully delete dir");
189 }
190 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
191 },
192 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
193 m.def(
194 "IsDirectory",
195 [](const std::string& dirname, PyTransactionToken* token) {
196 py::gil_scoped_release release;
197 const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
198 // FAILED_PRECONDITION response means path exists but isn't a dir.
199 if (tensorflow::errors::IsFailedPrecondition(status)) {
200 return false;
201 }
202
203 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
204 return true;
205 },
206 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
207 m.def("HasAtomicMove", [](const std::string& path) {
208 py::gil_scoped_release release;
209 bool has_atomic_move;
210 const auto status =
211 tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
212 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
213 return has_atomic_move;
214 });
215
216 py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
217 .def_readonly("length", &tensorflow::FileStatistics::length)
218 .def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)
219 .def_readonly("is_directory", &tensorflow::FileStatistics::is_directory);
220
221 m.def(
222 "Stat",
223 [](const std::string& filename, PyTransactionToken* token) {
224 py::gil_scoped_release release;
225 std::unique_ptr<tensorflow::FileStatistics> self(
226 new tensorflow::FileStatistics);
227 const auto status =
228 tensorflow::Env::Default()->Stat(filename, self.get());
229 py::gil_scoped_acquire acquire;
230 tensorflow::MaybeRaiseRegisteredFromStatus(status);
231 return self.release();
232 },
233 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
234
235 using tensorflow::WritableFile;
236 py::class_<WritableFile>(m, "WritableFile")
237 .def(py::init([](const std::string& filename, const std::string& mode,
238 PyTransactionToken* token) {
239 py::gil_scoped_release release;
240 auto* env = tensorflow::Env::Default();
241 std::unique_ptr<WritableFile> self;
242 const auto status = mode.find('a') == std::string::npos
243 ? env->NewWritableFile(filename, &self)
244 : env->NewAppendableFile(filename, &self);
245 py::gil_scoped_acquire acquire;
246 tensorflow::MaybeRaiseRegisteredFromStatus(status);
247 return self.release();
248 }),
249 py::arg("filename"), py::arg("mode"),
250 py::arg("token") = (PyTransactionToken*)nullptr)
251 .def("append",
252 [](WritableFile* self, tensorflow::StringPiece data) {
253 const auto status = self->Append(data);
254 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
255 })
256 // TODO(slebedev): Make WritableFile::Tell const and change self
257 // to be a reference.
258 .def("tell",
259 [](WritableFile* self) {
260 tensorflow::int64 pos = -1;
261 py::gil_scoped_release release;
262 const auto status = self->Tell(&pos);
263 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
264 return pos;
265 })
266 .def("flush",
267 [](WritableFile* self) {
268 py::gil_scoped_release release;
269 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Flush());
270 })
271 .def("close", [](WritableFile* self) {
272 py::gil_scoped_release release;
273 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Close());
274 });
275
276 using tensorflow::io::BufferedInputStream;
277 py::class_<BufferedInputStream>(m, "BufferedInputStream")
278 .def(py::init([](const std::string& filename, size_t buffer_size,
279 PyTransactionToken* token) {
280 py::gil_scoped_release release;
281 std::unique_ptr<tensorflow::RandomAccessFile> file;
282 const auto status =
283 tensorflow::Env::Default()->NewRandomAccessFile(filename,
284 &file);
285 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
286 std::unique_ptr<tensorflow::io::RandomAccessInputStream>
287 input_stream(new tensorflow::io::RandomAccessInputStream(
288 file.release(),
289 /*owns_file=*/true));
290 py::gil_scoped_acquire acquire;
291 return new BufferedInputStream(input_stream.release(), buffer_size,
292 /*owns_input_stream=*/true);
293 }),
294 py::arg("filename"), py::arg("buffer_size"),
295 py::arg("token") = (PyTransactionToken*)nullptr)
296 .def("read",
297 [](BufferedInputStream* self, tensorflow::int64 bytes_to_read) {
298 py::gil_scoped_release release;
299 tensorflow::tstring result;
300 const auto status = self->ReadNBytes(bytes_to_read, &result);
301 if (!status.ok() && !tensorflow::errors::IsOutOfRange(status)) {
302 result.clear();
303 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
304 }
305 py::gil_scoped_acquire acquire;
306 return py::bytes(result);
307 })
308 .def("readline",
309 [](BufferedInputStream* self) {
310 py::gil_scoped_release release;
311 auto output = self->ReadLineAsString();
312 py::gil_scoped_acquire acquire;
313 return py::bytes(output);
314 })
315 .def("seek",
316 [](BufferedInputStream* self, tensorflow::int64 pos) {
317 py::gil_scoped_release release;
318 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Seek(pos));
319 })
320 .def("tell", [](BufferedInputStream* self) {
321 py::gil_scoped_release release;
322 return self->Tell();
323 });
324 }
325 } // namespace
326