1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/zlib_outputbuffer.h"
17 
18 #include "tensorflow/core/lib/core/errors.h"
19 
20 namespace tensorflow {
21 namespace io {
22 
ZlibOutputBuffer(WritableFile * file,int32 input_buffer_bytes,int32 output_buffer_bytes,const ZlibCompressionOptions & zlib_options)23 ZlibOutputBuffer::ZlibOutputBuffer(
24     WritableFile* file,
25     int32 input_buffer_bytes,  // size of z_stream.next_in buffer
26     int32 output_buffer_bytes,
27     const ZlibCompressionOptions&
28         zlib_options)  // size of z_stream.next_out buffer
29     : file_(file),
30       init_status_(),
31       input_buffer_capacity_(input_buffer_bytes),
32       output_buffer_capacity_(output_buffer_bytes),
33       z_stream_input_(new Bytef[input_buffer_bytes]),
34       z_stream_output_(new Bytef[output_buffer_bytes]),
35       zlib_options_(zlib_options),
36       z_stream_(new z_stream) {}
37 
~ZlibOutputBuffer()38 ZlibOutputBuffer::~ZlibOutputBuffer() {
39   if (z_stream_) {
40     LOG(WARNING) << "ZlibOutputBuffer::Close() not called. Possible data loss";
41   }
42 }
43 
Init()44 Status ZlibOutputBuffer::Init() {
45   // Output buffer size should be greater than 1 because deflation needs at
46   // least one byte for book keeping etc.
47   if (output_buffer_capacity_ <= 1) {
48     return errors::InvalidArgument(
49         "output_buffer_bytes should be greater than "
50         "1");
51   }
52   memset(z_stream_.get(), 0, sizeof(z_stream));
53   z_stream_->zalloc = Z_NULL;
54   z_stream_->zfree = Z_NULL;
55   z_stream_->opaque = Z_NULL;
56   int status =
57       deflateInit2(z_stream_.get(), zlib_options_.compression_level,
58                    zlib_options_.compression_method, zlib_options_.window_bits,
59                    zlib_options_.mem_level, zlib_options_.compression_strategy);
60   if (status != Z_OK) {
61     z_stream_.reset(nullptr);
62     return errors::InvalidArgument("deflateInit failed with status", status);
63   }
64   z_stream_->next_in = z_stream_input_.get();
65   z_stream_->next_out = z_stream_output_.get();
66   z_stream_->avail_in = 0;
67   z_stream_->avail_out = output_buffer_capacity_;
68   return Status::OK();
69 }
70 
AvailableInputSpace() const71 int32 ZlibOutputBuffer::AvailableInputSpace() const {
72   return input_buffer_capacity_ - z_stream_->avail_in;
73 }
74 
AddToInputBuffer(StringPiece data)75 void ZlibOutputBuffer::AddToInputBuffer(StringPiece data) {
76   size_t bytes_to_write = data.size();
77   CHECK_LE(bytes_to_write, AvailableInputSpace());
78 
79   // Input stream ->
80   // [....................input_buffer_capacity_...............]
81   // [<...read_bytes...><...avail_in...>......empty space......]
82   //  ^                 ^
83   //  |                 |
84   //  z_stream_input_   next_in
85   //
86   // Data in the input stream is sharded as show above. z_stream_->next_in could
87   // be pointing to some byte in the buffer with avail_in number of bytes
88   // available to be read.
89   //
90   // In order to avoid shifting the avail_in bytes at next_in to the head of
91   // the buffer we try to fit `data` in the empty space at the tail of the
92   // input stream.
93   // TODO(srbs): This could be avoided if we had a circular buffer.
94   // If it doesn't fit we free the space at the head of the stream and then
95   // append `data` at the end of existing data.
96 
97   int32 read_bytes = z_stream_->next_in - z_stream_input_.get();
98   int32 unread_bytes = z_stream_->avail_in;
99   int32 free_tail_bytes = input_buffer_capacity_ - (read_bytes + unread_bytes);
100 
101   if (static_cast<int32>(bytes_to_write) > free_tail_bytes) {
102     memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in);
103     z_stream_->next_in = z_stream_input_.get();
104   }
105   memcpy(z_stream_->next_in + z_stream_->avail_in, data.data(), bytes_to_write);
106   z_stream_->avail_in += bytes_to_write;
107 }
108 
DeflateBuffered(int flush_mode)109 Status ZlibOutputBuffer::DeflateBuffered(int flush_mode) {
110   do {
111     // From zlib manual (http://www.zlib.net/manual.html):
112     //
113     // "In the case of a Z_FULL_FLUSH or Z_SYNC_FLUSH, make sure that
114     // avail_out is greater than six to avoid repeated flush markers due
115     // to avail_out == 0 on return."
116     //
117     // If above condition is met or if output buffer is full we flush contents
118     // to file.
119     if (z_stream_->avail_out == 0 ||
120         (IsSyncOrFullFlush(flush_mode) && z_stream_->avail_out < 6)) {
121       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
122     }
123     TF_RETURN_IF_ERROR(Deflate(flush_mode));
124   } while (z_stream_->avail_out == 0);
125 
126   DCHECK(z_stream_->avail_in == 0);
127   z_stream_->next_in = z_stream_input_.get();
128   return Status::OK();
129 }
130 
FlushOutputBufferToFile()131 Status ZlibOutputBuffer::FlushOutputBufferToFile() {
132   uint32 bytes_to_write = output_buffer_capacity_ - z_stream_->avail_out;
133   if (bytes_to_write > 0) {
134     Status s = file_->Append(StringPiece(
135         reinterpret_cast<char*>(z_stream_output_.get()), bytes_to_write));
136     if (s.ok()) {
137       z_stream_->next_out = z_stream_output_.get();
138       z_stream_->avail_out = output_buffer_capacity_;
139     }
140     return s;
141   }
142   return Status::OK();
143 }
144 
Append(StringPiece data)145 Status ZlibOutputBuffer::Append(StringPiece data) {
146   // If there is sufficient free space in z_stream_input_ to fit data we
147   // add it there and return.
148   // If there isn't enough space we deflate the existing contents of
149   // z_input_stream_. If data now fits in z_input_stream_ we add it there
150   // else we directly deflate it.
151   //
152   // The deflated output is accumulated in z_stream_output_ and gets written to
153   // file as and when needed.
154 
155   size_t bytes_to_write = data.size();
156 
157   if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
158     AddToInputBuffer(data);
159     return Status::OK();
160   }
161 
162   TF_RETURN_IF_ERROR(DeflateBuffered(zlib_options_.flush_mode));
163 
164   // At this point input stream should be empty.
165   if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
166     AddToInputBuffer(data);
167     return Status::OK();
168   }
169 
170   // `data` is too large to fit in input buffer so we deflate it directly.
171   // Note that at this point we have already deflated all existing input so
172   // we do not need to backup next_in and avail_in.
173   z_stream_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data.data()));
174   z_stream_->avail_in = bytes_to_write;
175 
176   do {
177     if (z_stream_->avail_out == 0) {
178       // No available output space.
179       // Write output buffer to file.
180       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
181     }
182     TF_RETURN_IF_ERROR(Deflate(zlib_options_.flush_mode));
183   } while (z_stream_->avail_out == 0);
184 
185   DCHECK(z_stream_->avail_in == 0);  // All input will be used up.
186 
187   // Restore z_stream input pointers.
188   z_stream_->next_in = z_stream_input_.get();
189 
190   return Status::OK();
191 }
192 
193 #if defined(TF_CORD_SUPPORT)
Append(const absl::Cord & cord)194 Status ZlibOutputBuffer::Append(const absl::Cord& cord) {
195   for (absl::string_view fragment : cord.Chunks()) {
196     TF_RETURN_IF_ERROR(Append(fragment));
197   }
198   return Status::OK();
199 }
200 #endif
201 
Flush()202 Status ZlibOutputBuffer::Flush() {
203   TF_RETURN_IF_ERROR(DeflateBuffered(Z_PARTIAL_FLUSH));
204   TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
205   return file_->Flush();
206 }
207 
Name(StringPiece * result) const208 Status ZlibOutputBuffer::Name(StringPiece* result) const {
209   return file_->Name(result);
210 }
211 
Sync()212 Status ZlibOutputBuffer::Sync() {
213   TF_RETURN_IF_ERROR(Flush());
214   return file_->Sync();
215 }
216 
Close()217 Status ZlibOutputBuffer::Close() {
218   if (z_stream_) {
219     TF_RETURN_IF_ERROR(DeflateBuffered(Z_FINISH));
220     TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
221     deflateEnd(z_stream_.get());
222     z_stream_.reset(nullptr);
223   }
224   return Status::OK();
225 }
226 
Deflate(int flush)227 Status ZlibOutputBuffer::Deflate(int flush) {
228   int error = deflate(z_stream_.get(), flush);
229   if (error == Z_OK || error == Z_BUF_ERROR ||
230       (error == Z_STREAM_END && flush == Z_FINISH)) {
231     return Status::OK();
232   }
233   string error_string = strings::StrCat("deflate() failed with error ", error);
234   if (z_stream_->msg != nullptr) {
235     strings::StrAppend(&error_string, ": ", z_stream_->msg);
236   }
237   return errors::DataLoss(error_string);
238 }
239 
Tell(int64 * position)240 Status ZlibOutputBuffer::Tell(int64* position) { return file_->Tell(position); }
241 
242 }  // namespace io
243 }  // namespace tensorflow
244