1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 16 #define TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 17 18 #include "tensorflow/core/data/service/journal.pb.h" 19 #include "tensorflow/core/lib/core/status.h" 20 #include "tensorflow/core/lib/io/record_reader.h" 21 #include "tensorflow/core/lib/io/record_writer.h" 22 #include "tensorflow/core/platform/env.h" 23 24 namespace tensorflow { 25 namespace data { 26 27 // Returns the location of the journal file within the journal directory. 28 std::string DataServiceJournalFile(const std::string& journal_dir, 29 int64 sequence_number); 30 31 // Interface for writing to a journal. 32 class JournalWriter { 33 public: 34 virtual ~JournalWriter() = default; 35 // Writes and syncs an update to the journal. 36 virtual Status Write(const Update& update) = 0; 37 // Initializes the writer if it is not yet initialized. 38 virtual Status EnsureInitialized() = 0; 39 }; 40 41 // FileJournalWriter is not thread-safe, requiring external synchronization when 42 // used by multiple threads. 43 // 44 // FileJournalWriter writes journal files to a configured journal directory. The 45 // directory is laid out in the following format: 46 // 47 // journal_dir/ 48 // journal_0 49 // journal_1 50 // ... 51 // 52 // When the writer is created, it lists the directory to find the next available 53 // journal file name. For example, if the journal directory contains 54 // "journal_0", "journal_1", and "journal_2", the writer will write to 55 // "journal_3". The writer will flush updates as they are written, so that they 56 // can be stored durably in case of machine failure. 57 class FileJournalWriter : public JournalWriter { 58 public: 59 // Creates a journal writer to write to the given journal directory. 60 // If there is already journal data there, the journal writer will append to 61 // the existing journal. 62 explicit FileJournalWriter(Env* env, const std::string& journal_dir); 63 FileJournalWriter(const FileJournalWriter&) = delete; 64 FileJournalWriter& operator=(const FileJournalWriter&) = delete; 65 66 Status Write(const Update& update) override; 67 Status EnsureInitialized() override; 68 69 private: 70 Env* env_; 71 const std::string journal_dir_; 72 std::unique_ptr<WritableFile> file_; 73 std::unique_ptr<io::RecordWriter> writer_; 74 }; 75 76 // Interface for reading from a journal. 77 class JournalReader { 78 public: 79 virtual ~JournalReader() = default; 80 // Reads the next update from the journal. Sets `end_of_journal=true` if 81 // there are no more updates left in the journal. 82 virtual Status Read(Update& update, bool& end_of_journal) = 0; 83 }; 84 85 // JournalReader is not thread-safe, requiring external synchronization when 86 // used by multiple threads. 87 // 88 // The journal reader reads through all journal files in the configured journal 89 // directory, in order of their sequence numbers. See FileJournalWriter above. 90 class FileJournalReader : public JournalReader { 91 public: 92 explicit FileJournalReader(Env* env, StringPiece journal_dir); 93 FileJournalReader(const FileJournalReader&) = delete; 94 FileJournalReader& operator=(const FileJournalReader&) = delete; 95 96 Status Read(Update& update, bool& end_of_journal) override; 97 98 private: 99 // Initializes the reader if it is not yet initialized. 100 Status EnsureInitialized(); 101 // Updates the `FileJournalReader` to read from a new file. 102 Status UpdateFile(const std::string& filename); 103 104 Env* env_; 105 const std::string journal_dir_; 106 // Sequence number of current journal file. 107 int64 sequence_number_ = 0; 108 // Current offset into `file_`. 109 uint64 offset_ = 0; 110 std::unique_ptr<RandomAccessFile> file_; 111 std::unique_ptr<io::RecordReader> reader_; 112 }; 113 114 } // namespace data 115 } // namespace tensorflow 116 117 #endif // TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 118