1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/data/service/journal.h"
17
18 #include "absl/memory/memory.h"
19 #include "tensorflow/core/data/service/journal.pb.h"
20 #include "tensorflow/core/lib/io/record_reader.h"
21 #include "tensorflow/core/lib/io/record_writer.h"
22 #include "tensorflow/core/platform/env.h"
23 #include "tensorflow/core/platform/errors.h"
24 #include "tensorflow/core/platform/path.h"
25 #include "tensorflow/core/platform/regexp.h"
26
27 namespace tensorflow {
28 namespace data {
29
30 namespace {
31 constexpr StringPiece kJournal = "journal";
32
ParseSequenceNumber(const std::string & journal_file,int64 * sequence_number)33 Status ParseSequenceNumber(const std::string& journal_file,
34 int64* sequence_number) {
35 if (!RE2::FullMatch(journal_file, ".*_(\\d+)", sequence_number)) {
36 return errors::InvalidArgument("Failed to parse journal file name: ",
37 journal_file);
38 }
39 return Status::OK();
40 }
41 } // namespace
42
DataServiceJournalFile(const std::string & journal_dir,int64 sequence_number)43 std::string DataServiceJournalFile(const std::string& journal_dir,
44 int64 sequence_number) {
45 return io::JoinPath(journal_dir,
46 absl::StrCat(kJournal, "_", sequence_number));
47 }
48
FileJournalWriter(Env * env,const std::string & journal_dir)49 FileJournalWriter::FileJournalWriter(Env* env, const std::string& journal_dir)
50 : env_(env), journal_dir_(journal_dir) {}
51
EnsureInitialized()52 Status FileJournalWriter::EnsureInitialized() {
53 if (writer_) {
54 return Status::OK();
55 }
56 std::vector<std::string> journal_files;
57 TF_RETURN_IF_ERROR(env_->RecursivelyCreateDir(journal_dir_));
58 TF_RETURN_IF_ERROR(env_->GetChildren(journal_dir_, &journal_files));
59 int64 latest_sequence_number = -1;
60 for (const auto& file : journal_files) {
61 int64 sequence_number;
62 TF_RETURN_IF_ERROR(ParseSequenceNumber(file, &sequence_number));
63 latest_sequence_number = std::max(latest_sequence_number, sequence_number);
64 }
65 std::string journal_file =
66 DataServiceJournalFile(journal_dir_, latest_sequence_number + 1);
67 TF_RETURN_IF_ERROR(env_->NewAppendableFile(journal_file, &file_));
68 writer_ = absl::make_unique<io::RecordWriter>(file_.get());
69 VLOG(1) << "Created journal writer to write to " << journal_file;
70 return Status::OK();
71 }
72
Write(const Update & update)73 Status FileJournalWriter::Write(const Update& update) {
74 TF_RETURN_IF_ERROR(EnsureInitialized());
75 std::string s = update.SerializeAsString();
76 if (s.empty()) {
77 return errors::Internal("Failed to serialize update ", update.DebugString(),
78 " to string");
79 }
80 TF_RETURN_IF_ERROR(writer_->WriteRecord(s));
81 TF_RETURN_IF_ERROR(writer_->Flush());
82 TF_RETURN_IF_ERROR(file_->Sync());
83 if (VLOG_IS_ON(4)) {
84 VLOG(4) << "Wrote journal entry: " << update.DebugString();
85 }
86 return Status::OK();
87 }
88
FileJournalReader(Env * env,StringPiece journal_dir)89 FileJournalReader::FileJournalReader(Env* env, StringPiece journal_dir)
90 : env_(env), journal_dir_(journal_dir) {}
91
EnsureInitialized()92 Status FileJournalReader::EnsureInitialized() {
93 if (reader_) {
94 return Status::OK();
95 }
96 return UpdateFile(DataServiceJournalFile(journal_dir_, 0));
97 }
98
Read(Update & update,bool & end_of_journal)99 Status FileJournalReader::Read(Update& update, bool& end_of_journal) {
100 TF_RETURN_IF_ERROR(EnsureInitialized());
101 while (true) {
102 tstring record;
103 Status s = reader_->ReadRecord(&offset_, &record);
104 if (errors::IsOutOfRange(s)) {
105 sequence_number_++;
106 std::string next_journal_file =
107 DataServiceJournalFile(journal_dir_, sequence_number_);
108 if (errors::IsNotFound(env_->FileExists(next_journal_file))) {
109 VLOG(3) << "Next journal file " << next_journal_file
110 << " does not exist. End of journal reached.";
111 end_of_journal = true;
112 return Status::OK();
113 }
114 TF_RETURN_IF_ERROR(UpdateFile(next_journal_file));
115 continue;
116 }
117 TF_RETURN_IF_ERROR(s);
118 if (!update.ParseFromString(record)) {
119 return errors::DataLoss("Failed to parse journal record.");
120 }
121 if (VLOG_IS_ON(4)) {
122 VLOG(4) << "Read journal entry: " << update.DebugString();
123 }
124 end_of_journal = false;
125 return Status::OK();
126 }
127 }
128
UpdateFile(const std::string & filename)129 Status FileJournalReader::UpdateFile(const std::string& filename) {
130 VLOG(1) << "Reading from journal file " << filename;
131 TF_RETURN_IF_ERROR(env_->NewRandomAccessFile(filename, &file_));
132 reader_ = absl::make_unique<io::RecordReader>(file_.get());
133 offset_ = 0;
134 return Status::OK();
135 }
136
137 } // namespace data
138 } // namespace tensorflow
139