1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
17 #define TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
18 
19 #include "tensorflow/core/lib/core/coding.h"
20 #include "tensorflow/core/lib/core/status.h"
21 #include "tensorflow/core/lib/core/stringpiece.h"
22 #include "tensorflow/core/lib/hash/crc32c.h"
23 #if !defined(IS_SLIM_BUILD)
24 #include "tensorflow/core/lib/io/snappy/snappy_compression_options.h"
25 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"
26 #include "tensorflow/core/lib/io/zlib_compression_options.h"
27 #include "tensorflow/core/lib/io/zlib_outputbuffer.h"
28 #endif  // IS_SLIM_BUILD
29 #include "tensorflow/core/platform/cord.h"
30 #include "tensorflow/core/platform/macros.h"
31 #include "tensorflow/core/platform/types.h"
32 
33 namespace tensorflow {
34 
35 class WritableFile;
36 
37 namespace io {
38 
39 struct RecordWriterOptions {
40  public:
41   enum CompressionType {
42     NONE = 0,
43     ZLIB_COMPRESSION = 1,
44     SNAPPY_COMPRESSION = 2
45   };
46   CompressionType compression_type = NONE;
47 
48   static RecordWriterOptions CreateRecordWriterOptions(
49       const string& compression_type);
50 
51 #if !defined(IS_SLIM_BUILD)
52   // Options specific to compression.
53   tensorflow::io::ZlibCompressionOptions zlib_options;
54   tensorflow::io::SnappyCompressionOptions snappy_options;
55 #endif  // IS_SLIM_BUILD
56 };
57 
58 class RecordWriter {
59  public:
60   // Format of a single record:
61   //  uint64    length
62   //  uint32    masked crc of length
63   //  byte      data[length]
64   //  uint32    masked crc of data
65   static constexpr size_t kHeaderSize = sizeof(uint64) + sizeof(uint32);
66   static constexpr size_t kFooterSize = sizeof(uint32);
67 
68   // Create a writer that will append data to "*dest".
69   // "*dest" must be initially empty.
70   // "*dest" must remain live while this Writer is in use.
71   explicit RecordWriter(WritableFile* dest, const RecordWriterOptions& options =
72                                                 RecordWriterOptions());
73 
74   // Calls Close() and logs if an error occurs.
75   //
76   // TODO(jhseu): Require that callers explicitly call Close() and remove the
77   // implicit Close() call in the destructor.
78   ~RecordWriter();
79 
80   Status WriteRecord(StringPiece data);
81 
82 #if defined(TF_CORD_SUPPORT)
83   Status WriteRecord(const absl::Cord& data);
84 #endif
85 
86   // Flushes any buffered data held by underlying containers of the
87   // RecordWriter to the WritableFile. Does *not* flush the
88   // WritableFile.
89   Status Flush();
90 
91   // Writes all output to the file. Does *not* close the WritableFile.
92   //
93   // After calling Close(), any further calls to `WriteRecord()` or `Flush()`
94   // are invalid.
95   Status Close();
96 
97   // Utility method to populate TFRecord headers.  Populates record-header in
98   // "header[0,kHeaderSize-1]".  The record-header is based on data[0, n-1].
99   inline static void PopulateHeader(char* header, const char* data, size_t n);
100 
101   inline static void PopulateHeader(char* header, const absl::Cord& data);
102 
103   // Utility method to populate TFRecord footers.  Populates record-footer in
104   // "footer[0,kFooterSize-1]".  The record-footer is based on data[0, n-1].
105   inline static void PopulateFooter(char* footer, const char* data, size_t n);
106 
107 #if defined(TF_CORD_SUPPORT)
108   inline static void PopulateFooter(char* footer, const absl::Cord& data);
109 #endif
110 
111  private:
112   WritableFile* dest_;
113   RecordWriterOptions options_;
114 
MaskedCrc(const char * data,size_t n)115   inline static uint32 MaskedCrc(const char* data, size_t n) {
116     return crc32c::Mask(crc32c::Value(data, n));
117   }
118 
119 #if defined(TF_CORD_SUPPORT)
MaskedCrc(const absl::Cord & data)120   inline static uint32 MaskedCrc(const absl::Cord& data) {
121     return crc32c::Mask(crc32c::Value(data));
122   }
123 #endif
124 
125   TF_DISALLOW_COPY_AND_ASSIGN(RecordWriter);
126 };
127 
PopulateHeader(char * header,const char * data,size_t n)128 void RecordWriter::PopulateHeader(char* header, const char* data, size_t n) {
129   core::EncodeFixed64(header + 0, n);
130   core::EncodeFixed32(header + sizeof(uint64),
131                       MaskedCrc(header, sizeof(uint64)));
132 }
133 
PopulateFooter(char * footer,const char * data,size_t n)134 void RecordWriter::PopulateFooter(char* footer, const char* data, size_t n) {
135   core::EncodeFixed32(footer, MaskedCrc(data, n));
136 }
137 
138 #if defined(TF_CORD_SUPPORT)
PopulateHeader(char * header,const absl::Cord & data)139 void RecordWriter::PopulateHeader(char* header, const absl::Cord& data) {
140   core::EncodeFixed64(header + 0, data.size());
141   core::EncodeFixed32(header + sizeof(uint64),
142                       MaskedCrc(header, sizeof(uint64)));
143 }
144 
PopulateFooter(char * footer,const absl::Cord & data)145 void RecordWriter::PopulateFooter(char* footer, const absl::Cord& data) {
146   core::EncodeFixed32(footer, MaskedCrc(data));
147 }
148 #endif
149 
150 }  // namespace io
151 }  // namespace tensorflow
152 
153 #endif  // TENSORFLOW_CORE_LIB_IO_RECORD_WRITER_H_
154