1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
16 #define TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
17 
18 #include "tensorflow/core/data/service/journal.pb.h"
19 #include "tensorflow/core/lib/core/status.h"
20 #include "tensorflow/core/lib/io/record_reader.h"
21 #include "tensorflow/core/lib/io/record_writer.h"
22 #include "tensorflow/core/platform/env.h"
23 
24 namespace tensorflow {
25 namespace data {
26 
27 // Returns the location of the journal file within the journal directory.
28 std::string DataServiceJournalFile(const std::string& journal_dir,
29                                    int64 sequence_number);
30 
31 // Interface for writing to a journal.
32 class JournalWriter {
33  public:
34   virtual ~JournalWriter() = default;
35   // Writes and syncs an update to the journal.
36   virtual Status Write(const Update& update) = 0;
37   // Initializes the writer if it is not yet initialized.
38   virtual Status EnsureInitialized() = 0;
39 };
40 
41 // FileJournalWriter is not thread-safe, requiring external synchronization when
42 // used by multiple threads.
43 //
44 // FileJournalWriter writes journal files to a configured journal directory. The
45 // directory is laid out in the following format:
46 //
47 // journal_dir/
48 //   journal_0
49 //   journal_1
50 //   ...
51 //
52 // When the writer is created, it lists the directory to find the next available
53 // journal file name. For example, if the journal directory contains
54 // "journal_0", "journal_1", and "journal_2", the writer will write to
55 // "journal_3". The writer will flush updates as they are written, so that they
56 // can be stored durably in case of machine failure.
57 class FileJournalWriter : public JournalWriter {
58  public:
59   // Creates a journal writer to write to the given journal directory.
60   // If there is already journal data there, the journal writer will append to
61   // the existing journal.
62   explicit FileJournalWriter(Env* env, const std::string& journal_dir);
63   FileJournalWriter(const FileJournalWriter&) = delete;
64   FileJournalWriter& operator=(const FileJournalWriter&) = delete;
65 
66   Status Write(const Update& update) override;
67   Status EnsureInitialized() override;
68 
69  private:
70   Env* env_;
71   const std::string journal_dir_;
72   std::unique_ptr<WritableFile> file_;
73   std::unique_ptr<io::RecordWriter> writer_;
74 };
75 
76 // Interface for reading from a journal.
77 class JournalReader {
78  public:
79   virtual ~JournalReader() = default;
80   // Reads the next update from the journal. Sets `end_of_journal=true` if
81   // there are no more updates left in the journal.
82   virtual Status Read(Update& update, bool& end_of_journal) = 0;
83 };
84 
85 // JournalReader is not thread-safe, requiring external synchronization when
86 // used by multiple threads.
87 //
88 // The journal reader reads through all journal files in the configured journal
89 // directory, in order of their sequence numbers. See FileJournalWriter above.
90 class FileJournalReader : public JournalReader {
91  public:
92   explicit FileJournalReader(Env* env, StringPiece journal_dir);
93   FileJournalReader(const FileJournalReader&) = delete;
94   FileJournalReader& operator=(const FileJournalReader&) = delete;
95 
96   Status Read(Update& update, bool& end_of_journal) override;
97 
98  private:
99   // Initializes the reader if it is not yet initialized.
100   Status EnsureInitialized();
101   // Updates the `FileJournalReader` to read from a new file.
102   Status UpdateFile(const std::string& filename);
103 
104   Env* env_;
105   const std::string journal_dir_;
106   // Sequence number of current journal file.
107   int64 sequence_number_ = 0;
108   // Current offset into `file_`.
109   uint64 offset_ = 0;
110   std::unique_ptr<RandomAccessFile> file_;
111   std::unique_ptr<io::RecordReader> reader_;
112 };
113 
114 }  // namespace data
115 }  // namespace tensorflow
116 
117 #endif  // TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
118