1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"
17 
18 namespace tensorflow {
19 namespace io {
20 
SnappyOutputBuffer(WritableFile * file,int32 input_buffer_bytes,int32 output_buffer_bytes)21 SnappyOutputBuffer::SnappyOutputBuffer(WritableFile* file,
22                                        int32 input_buffer_bytes,
23                                        int32 output_buffer_bytes)
24     : file_(file),
25       input_buffer_(new char[input_buffer_bytes]),
26       input_buffer_capacity_(input_buffer_bytes),
27       next_in_(input_buffer_.get()),
28       output_buffer_(new char[output_buffer_bytes]),
29       output_buffer_capacity_(output_buffer_bytes),
30       next_out_(output_buffer_.get()),
31       avail_out_(output_buffer_bytes) {}
32 
~SnappyOutputBuffer()33 SnappyOutputBuffer::~SnappyOutputBuffer() {
34   size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
35   if (bytes_to_write > 0) {
36     LOG(WARNING) << "There is still data in the output buffer. "
37                  << "Possible data loss has occurred.";
38   }
39 }
40 
Append(StringPiece data)41 Status SnappyOutputBuffer::Append(StringPiece data) { return Write(data); }
42 
43 #if defined(TF_CORD_SUPPORT)
Append(const absl::Cord & cord)44 Status SnappyOutputBuffer::Append(const absl::Cord& cord) {
45   for (absl::string_view fragment : cord.Chunks()) {
46     TF_RETURN_IF_ERROR(Append(fragment));
47   }
48   return Status::OK();
49 }
50 #endif
51 
Close()52 Status SnappyOutputBuffer::Close() {
53   // Given that we do not own `file`, we don't close it.
54   return Flush();
55 }
56 
Name(StringPiece * result) const57 Status SnappyOutputBuffer::Name(StringPiece* result) const {
58   return file_->Name(result);
59 }
60 
Sync()61 Status SnappyOutputBuffer::Sync() {
62   TF_RETURN_IF_ERROR(Flush());
63   return file_->Sync();
64 }
65 
Tell(int64 * position)66 Status SnappyOutputBuffer::Tell(int64* position) {
67   return file_->Tell(position);
68 }
69 
Write(StringPiece data)70 Status SnappyOutputBuffer::Write(StringPiece data) {
71   //
72   // The deflated output is accumulated in output_buffer_ and gets written to
73   // file as and when needed.
74 
75   size_t bytes_to_write = data.size();
76 
77   // If there is sufficient free space in input_buffer_ to fit data we
78   // add it there and return.
79   if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
80     AddToInputBuffer(data);
81     return Status::OK();
82   }
83 
84   // If there isn't enough available space in the input_buffer_ we empty it
85   // by uncompressing its contents. If data now fits in input_buffer_
86   // we add it there else we directly deflate it.
87   TF_RETURN_IF_ERROR(DeflateBuffered());
88 
89   // input_buffer_ should be empty at this point.
90   if (static_cast<int32>(bytes_to_write) <= AvailableInputSpace()) {
91     AddToInputBuffer(data);
92     return Status::OK();
93   }
94 
95   // `data` is too large to fit in input buffer so we deflate it directly.
96   // Note that at this point we have already deflated all existing input so
97   // we do not need to backup next_in and avail_in.
98   next_in_ = const_cast<char*>(data.data());
99   avail_in_ = bytes_to_write;
100 
101   TF_RETURN_IF_ERROR(Deflate());
102 
103   DCHECK_EQ(avail_in_, 0);  // All input will be used up.
104 
105   next_in_ = input_buffer_.get();
106 
107   return Status::OK();
108 }
109 
Flush()110 Status SnappyOutputBuffer::Flush() {
111   TF_RETURN_IF_ERROR(DeflateBuffered());
112   TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
113   return Status::OK();
114 }
115 
AvailableInputSpace() const116 int32 SnappyOutputBuffer::AvailableInputSpace() const {
117   return input_buffer_capacity_ - avail_in_;
118 }
119 
AddToInputBuffer(StringPiece data)120 void SnappyOutputBuffer::AddToInputBuffer(StringPiece data) {
121   size_t bytes_to_write = data.size();
122   DCHECK_LE(bytes_to_write, AvailableInputSpace());
123 
124   // Input stream ->
125   // [....................input_buffer_capacity_...............]
126   // [<...read_bytes...><...avail_in...>......empty space......]
127   //  ^                 ^
128   //  |                 |
129   //  input_buffer_   next_in
130   //
131   // Data in the input stream is sharded as shown above. next_in_ could
132   // be pointing to some byte in the buffer with avail_in number of bytes
133   // available to be read.
134   //
135   // In order to avoid shifting the avail_in bytes at next_in to the head of
136   // the buffer we try to fit `data` in the empty space at the tail of the
137   // input stream.
138   // TODO(srbs): This could be avoided if we had a circular buffer.
139   // If it doesn't fit we free the space at the head of the stream and then
140   // append `data` at the end of existing data.
141 
142   const int32 read_bytes = next_in_ - input_buffer_.get();
143   const int32 unread_bytes = avail_in_;
144   const int32 free_tail_bytes =
145       input_buffer_capacity_ - (read_bytes + unread_bytes);
146 
147   if (static_cast<int32>(bytes_to_write) > free_tail_bytes) {
148     memmove(input_buffer_.get(), next_in_, avail_in_);
149     next_in_ = input_buffer_.get();
150   }
151   memcpy(next_in_ + avail_in_, data.data(), bytes_to_write);
152   avail_in_ += bytes_to_write;
153 }
154 
AddToOutputBuffer(const char * data,size_t length)155 Status SnappyOutputBuffer::AddToOutputBuffer(const char* data, size_t length) {
156   while (length > 0) {
157     size_t bytes_to_copy = std::min(length, avail_out_);
158     memcpy(next_out_, data, bytes_to_copy);
159     data += bytes_to_copy;
160     next_out_ += bytes_to_copy;
161     avail_out_ -= bytes_to_copy;
162     length -= bytes_to_copy;
163     if (avail_out_ == 0) {
164       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
165     }
166   }
167   return Status::OK();
168 }
169 
DeflateBuffered()170 Status SnappyOutputBuffer::DeflateBuffered() {
171   TF_RETURN_IF_ERROR(Deflate());
172   DCHECK_EQ(avail_in_, 0);
173   next_in_ = input_buffer_.get();
174   return Status::OK();
175 }
176 
FlushOutputBufferToFile()177 Status SnappyOutputBuffer::FlushOutputBufferToFile() {
178   size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
179   if (bytes_to_write > 0) {
180     Status s = file_->Append(StringPiece(
181         reinterpret_cast<char*>(output_buffer_.get()), bytes_to_write));
182     if (s.ok()) {
183       next_out_ = output_buffer_.get();
184       avail_out_ = output_buffer_capacity_;
185     }
186     return s;
187   }
188   return Status::OK();
189 }
190 
Deflate()191 Status SnappyOutputBuffer::Deflate() {
192   if (avail_in_ == 0) {
193     return Status::OK();
194   }
195   string output;
196   if (!port::Snappy_Compress(next_in_, avail_in_, &output)) {
197     return errors::DataLoss("Snappy_Compress failed");
198   }
199 
200   // Write length of compressed block to output buffer.
201   char compressed_length_array[4];
202   std::fill(compressed_length_array, compressed_length_array + 4, 0);
203   for (int i = 0; i < 4; i++) {
204     // Little endian.
205     compressed_length_array[i] = output.size() >> (8 * (3 - i));
206   }
207   TF_RETURN_IF_ERROR(AddToOutputBuffer(compressed_length_array, 4));
208 
209   // Write compressed output to buffer.
210   TF_RETURN_IF_ERROR(AddToOutputBuffer(output.data(), output.size()));
211   next_in_ += avail_in_;
212   avail_in_ = 0;
213 
214   return Status::OK();
215 }
216 
217 }  // namespace io
218 }  // namespace tensorflow
219