1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/data/service/journal.h"
17 
18 #include "absl/memory/memory.h"
19 #include "tensorflow/core/data/service/journal.pb.h"
20 #include "tensorflow/core/lib/io/record_reader.h"
21 #include "tensorflow/core/lib/io/record_writer.h"
22 #include "tensorflow/core/platform/env.h"
23 #include "tensorflow/core/platform/errors.h"
24 #include "tensorflow/core/platform/path.h"
25 #include "tensorflow/core/platform/regexp.h"
26 
27 namespace tensorflow {
28 namespace data {
29 
30 namespace {
31 constexpr StringPiece kJournal = "journal";
32 
ParseSequenceNumber(const std::string & journal_file,int64 * sequence_number)33 Status ParseSequenceNumber(const std::string& journal_file,
34                            int64* sequence_number) {
35   if (!RE2::FullMatch(journal_file, ".*_(\\d+)", sequence_number)) {
36     return errors::InvalidArgument("Failed to parse journal file name: ",
37                                    journal_file);
38   }
39   return Status::OK();
40 }
41 }  // namespace
42 
DataServiceJournalFile(const std::string & journal_dir,int64 sequence_number)43 std::string DataServiceJournalFile(const std::string& journal_dir,
44                                    int64 sequence_number) {
45   return io::JoinPath(journal_dir,
46                       absl::StrCat(kJournal, "_", sequence_number));
47 }
48 
FileJournalWriter(Env * env,const std::string & journal_dir)49 FileJournalWriter::FileJournalWriter(Env* env, const std::string& journal_dir)
50     : env_(env), journal_dir_(journal_dir) {}
51 
EnsureInitialized()52 Status FileJournalWriter::EnsureInitialized() {
53   if (writer_) {
54     return Status::OK();
55   }
56   std::vector<std::string> journal_files;
57   TF_RETURN_IF_ERROR(env_->RecursivelyCreateDir(journal_dir_));
58   TF_RETURN_IF_ERROR(env_->GetChildren(journal_dir_, &journal_files));
59   int64 latest_sequence_number = -1;
60   for (const auto& file : journal_files) {
61     int64 sequence_number;
62     TF_RETURN_IF_ERROR(ParseSequenceNumber(file, &sequence_number));
63     latest_sequence_number = std::max(latest_sequence_number, sequence_number);
64   }
65   std::string journal_file =
66       DataServiceJournalFile(journal_dir_, latest_sequence_number + 1);
67   TF_RETURN_IF_ERROR(env_->NewAppendableFile(journal_file, &file_));
68   writer_ = absl::make_unique<io::RecordWriter>(file_.get());
69   VLOG(1) << "Created journal writer to write to " << journal_file;
70   return Status::OK();
71 }
72 
Write(const Update & update)73 Status FileJournalWriter::Write(const Update& update) {
74   TF_RETURN_IF_ERROR(EnsureInitialized());
75   std::string s = update.SerializeAsString();
76   if (s.empty()) {
77     return errors::Internal("Failed to serialize update ", update.DebugString(),
78                             " to string");
79   }
80   TF_RETURN_IF_ERROR(writer_->WriteRecord(s));
81   TF_RETURN_IF_ERROR(writer_->Flush());
82   TF_RETURN_IF_ERROR(file_->Sync());
83   if (VLOG_IS_ON(4)) {
84     VLOG(4) << "Wrote journal entry: " << update.DebugString();
85   }
86   return Status::OK();
87 }
88 
FileJournalReader(Env * env,StringPiece journal_dir)89 FileJournalReader::FileJournalReader(Env* env, StringPiece journal_dir)
90     : env_(env), journal_dir_(journal_dir) {}
91 
EnsureInitialized()92 Status FileJournalReader::EnsureInitialized() {
93   if (reader_) {
94     return Status::OK();
95   }
96   return UpdateFile(DataServiceJournalFile(journal_dir_, 0));
97 }
98 
Read(Update & update,bool & end_of_journal)99 Status FileJournalReader::Read(Update& update, bool& end_of_journal) {
100   TF_RETURN_IF_ERROR(EnsureInitialized());
101   while (true) {
102     tstring record;
103     Status s = reader_->ReadRecord(&offset_, &record);
104     if (errors::IsOutOfRange(s)) {
105       sequence_number_++;
106       std::string next_journal_file =
107           DataServiceJournalFile(journal_dir_, sequence_number_);
108       if (errors::IsNotFound(env_->FileExists(next_journal_file))) {
109         VLOG(3) << "Next journal file " << next_journal_file
110                 << " does not exist. End of journal reached.";
111         end_of_journal = true;
112         return Status::OK();
113       }
114       TF_RETURN_IF_ERROR(UpdateFile(next_journal_file));
115       continue;
116     }
117     TF_RETURN_IF_ERROR(s);
118     if (!update.ParseFromString(record)) {
119       return errors::DataLoss("Failed to parse journal record.");
120     }
121     if (VLOG_IS_ON(4)) {
122       VLOG(4) << "Read journal entry: " << update.DebugString();
123     }
124     end_of_journal = false;
125     return Status::OK();
126   }
127 }
128 
UpdateFile(const std::string & filename)129 Status FileJournalReader::UpdateFile(const std::string& filename) {
130   VLOG(1) << "Reading from journal file " << filename;
131   TF_RETURN_IF_ERROR(env_->NewRandomAccessFile(filename, &file_));
132   reader_ = absl::make_unique<io::RecordReader>(file_.get());
133   offset_ = 0;
134   return Status::OK();
135 }
136 
137 }  // namespace data
138 }  // namespace tensorflow
139