1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_LIB_IO_RECORD_READER_H_
17 #define TENSORFLOW_CORE_LIB_IO_RECORD_READER_H_
18 
19 #include "tensorflow/core/lib/core/errors.h"
20 #include "tensorflow/core/lib/core/stringpiece.h"
21 #include "tensorflow/core/lib/io/inputstream_interface.h"
22 #if !defined(IS_SLIM_BUILD)
23 #include "tensorflow/core/lib/io/snappy/snappy_compression_options.h"
24 #include "tensorflow/core/lib/io/snappy/snappy_inputstream.h"
25 #include "tensorflow/core/lib/io/zlib_compression_options.h"
26 #include "tensorflow/core/lib/io/zlib_inputstream.h"
27 #endif  // IS_SLIM_BUILD
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/types.h"
30 
31 namespace tensorflow {
32 
33 class RandomAccessFile;
34 
35 namespace io {
36 
37 struct RecordReaderOptions {
38   enum CompressionType {
39     NONE = 0,
40     ZLIB_COMPRESSION = 1,
41     SNAPPY_COMPRESSION = 2
42   };
43   CompressionType compression_type = NONE;
44 
45   // If buffer_size is non-zero, then all reads must be sequential, and no
46   // skipping around is permitted. (Note: this is the same behavior as reading
47   // compressed files.) Consider using SequentialRecordReader.
48   int64 buffer_size = 0;
49 
50   static RecordReaderOptions CreateRecordReaderOptions(
51       const string& compression_type);
52 
53 #if !defined(IS_SLIM_BUILD)
54   // Options specific to compression.
55   ZlibCompressionOptions zlib_options;
56   SnappyCompressionOptions snappy_options;
57 #endif  // IS_SLIM_BUILD
58 };
59 
60 // Low-level interface to read TFRecord files.
61 //
62 // If using compression or buffering, consider using SequentialRecordReader.
63 //
64 // Note: this class is not thread safe; external synchronization required.
65 class RecordReader {
66  public:
67   // Format of a single record:
68   //  uint64    length
69   //  uint32    masked crc of length
70   //  byte      data[length]
71   //  uint32    masked crc of data
72   static constexpr size_t kHeaderSize = sizeof(uint64) + sizeof(uint32);
73   static constexpr size_t kFooterSize = sizeof(uint32);
74 
75   // Statistics (sizes are in units of bytes)
76   struct Stats {
77     int64 file_size = -1;
78     int64 data_size = -1;
79     int64 entries = -1;  // Number of values
80   };
81 
82   // Metadata for the TFRecord file.
83   struct Metadata {
84     Stats stats;
85   };
86 
87   // Create a reader that will return log records from "*file".
88   // "*file" must remain live while this Reader is in use.
89   explicit RecordReader(
90       RandomAccessFile* file,
91       const RecordReaderOptions& options = RecordReaderOptions());
92 
93   virtual ~RecordReader() = default;
94 
95   // Read the record at "*offset" into *record and update *offset to
96   // point to the offset of the next record.  Returns OK on success,
97   // OUT_OF_RANGE for end of file, or something else for an error.
98   Status ReadRecord(uint64* offset, tstring* record);
99 
100   // Skip num_to_skip record starting at "*offset" and update *offset
101   // to point to the offset of the next num_to_skip + 1 record.
102   // Return OK on success, OUT_OF_RANGE for end of file, or something
103   // else for an error. "*num_skipped" records the number of records that
104   // are actually skipped. It should be equal to num_to_skip on success.
105   Status SkipRecords(uint64* offset, int num_to_skip, int* num_skipped);
106 
107   // Return the metadata of the Record file.
108   //
109   // The current implementation scans the file to completion,
110   // skipping over the data regions, to extract the metadata once
111   // on the first call to GetStats().  An improved implementation
112   // would change RecordWriter to write the metadata into TFRecord
113   // so that GetMetadata() could be a const method.
114   //
115   // 'metadata' must not be nullptr.
116   Status GetMetadata(Metadata* md);
117 
118  private:
119   Status ReadChecksummed(uint64 offset, size_t n, tstring* result);
120   Status PositionInputStream(uint64 offset);
121 
122   RecordReaderOptions options_;
123   std::unique_ptr<InputStreamInterface> input_stream_;
124   bool last_read_failed_;
125 
126   std::unique_ptr<Metadata> cached_metadata_;
127 
128   TF_DISALLOW_COPY_AND_ASSIGN(RecordReader);
129 };
130 
131 // High-level interface to read TFRecord files.
132 //
133 // Note: this class is not thread safe; external synchronization required.
134 class SequentialRecordReader {
135  public:
136   // Create a reader that will return log records from "*file".
137   // "*file" must remain live while this Reader is in use.
138   explicit SequentialRecordReader(
139       RandomAccessFile* file,
140       const RecordReaderOptions& options = RecordReaderOptions());
141 
142   virtual ~SequentialRecordReader() = default;
143 
144   // Read the next record in the file into *record. Returns OK on success,
145   // OUT_OF_RANGE for end of file, or something else for an error.
ReadRecord(tstring * record)146   Status ReadRecord(tstring* record) {
147     return underlying_.ReadRecord(&offset_, record);
148   }
149 
150   // Skip the next num_to_skip record in the file. Return OK on success,
151   // OUT_OF_RANGE for end of file, or something else for an error.
152   // "*num_skipped" records the number of records that are actually skipped.
153   // It should be equal to num_to_skip on success.
SkipRecords(int num_to_skip,int * num_skipped)154   Status SkipRecords(int num_to_skip, int* num_skipped) {
155     return underlying_.SkipRecords(&offset_, num_to_skip, num_skipped);
156   }
157 
158   // Return the current offset in the file.
TellOffset()159   uint64 TellOffset() { return offset_; }
160 
161   // Seek to this offset within the file and set this offset as the current
162   // offset. Trying to seek backward will throw error.
SeekOffset(uint64 offset)163   Status SeekOffset(uint64 offset) {
164     if (offset < offset_)
165       return errors::InvalidArgument(
166           "Trying to seek offset: ", offset,
167           " which is less than the current offset: ", offset_);
168     offset_ = offset;
169     return Status::OK();
170   }
171 
172  private:
173   RecordReader underlying_;
174   uint64 offset_ = 0;
175 };
176 
177 }  // namespace io
178 }  // namespace tensorflow
179 
180 #endif  // TENSORFLOW_CORE_LIB_IO_RECORD_READER_H_
181