1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/snappy/snappy_inputbuffer.h"
17 
18 namespace tensorflow {
19 namespace io {
SnappyInputBuffer(RandomAccessFile * file,size_t input_buffer_bytes,size_t output_buffer_bytes)20 SnappyInputBuffer::SnappyInputBuffer(
21     RandomAccessFile* file,
22     size_t input_buffer_bytes,  // size of input_buffer_
23     size_t output_buffer_bytes  // size of output_buffer_
24     )
25     : file_(file),
26       input_buffer_capacity_(input_buffer_bytes),
27       output_buffer_capacity_(output_buffer_bytes),
28       input_buffer_(new char[input_buffer_capacity_]),
29       output_buffer_(new char[output_buffer_capacity_]),
30       next_in_(input_buffer_.get()),
31       bytes_read_(0) {}
32 
ReadNBytes(int64 bytes_to_read,tstring * result)33 Status SnappyInputBuffer::ReadNBytes(int64 bytes_to_read, tstring* result) {
34   result->clear();
35   result->resize_uninitialized(bytes_to_read);
36 
37   char* result_ptr = result->mdata();
38 
39   // Read as many bytes as possible from cache.
40   size_t bytes_read = ReadBytesFromCache(bytes_to_read, result_ptr);
41   bytes_to_read -= bytes_read;
42   result_ptr += bytes_read;
43 
44   while (bytes_to_read > 0) {
45     // At this point we can be sure that cache has been emptied.
46     DCHECK_EQ(avail_out_, 0);
47 
48     // Now that the cache is empty we need to inflate more data.
49     TF_RETURN_IF_ERROR(Inflate());
50 
51     bytes_read = ReadBytesFromCache(bytes_to_read, result_ptr);
52     bytes_to_read -= bytes_read;
53     result_ptr += bytes_read;
54   }
55 
56   return Status::OK();
57 }
58 
Tell() const59 int64 SnappyInputBuffer::Tell() const { return bytes_read_; }
60 
Reset()61 Status SnappyInputBuffer::Reset() {
62   file_pos_ = 0;
63   avail_in_ = 0;
64   avail_out_ = 0;
65   next_in_ = input_buffer_.get();
66   bytes_read_ = 0;
67   return Status::OK();
68 }
69 
ReadBytesFromCache(size_t bytes_to_read,char * result_ptr)70 size_t SnappyInputBuffer::ReadBytesFromCache(size_t bytes_to_read,
71                                              char* result_ptr) {
72   size_t can_read_bytes = std::min(bytes_to_read, avail_out_);
73   if (can_read_bytes > 0) {
74     memcpy(result_ptr, next_out_, can_read_bytes);
75     next_out_ += can_read_bytes;
76     avail_out_ -= can_read_bytes;
77   }
78   bytes_read_ += can_read_bytes;
79   return can_read_bytes;
80 }
81 
Inflate()82 Status SnappyInputBuffer::Inflate() {
83   // Read length of compressed block.
84   uint32 compressed_block_length;
85   TF_RETURN_IF_ERROR(ReadCompressedBlockLength(&compressed_block_length));
86 
87   // If the entire block is not in cache do a read from file.
88   if (avail_in_ < compressed_block_length) {
89     TF_RETURN_IF_ERROR(ReadFromFile());
90     if (avail_in_ < compressed_block_length) {
91       if (compressed_block_length > input_buffer_capacity_) {
92         return errors::ResourceExhausted(
93             "Input buffer(size: ", input_buffer_capacity_,
94             " bytes) too small. Should be larger ", "than ",
95             compressed_block_length, " bytes.");
96       } else {
97         return errors::DataLoss(
98             strings::StrCat("Failed to read ", compressed_block_length,
99                             " bytes from file. Possible data corruption."));
100       }
101     }
102   }
103 
104   size_t uncompressed_length;
105   if (!port::Snappy_GetUncompressedLength(next_in_, compressed_block_length,
106                                           &uncompressed_length)) {
107     return errors::DataLoss("Parsing error in Snappy_GetUncompressedLength");
108   }
109 
110   // Output buffer must have been cleared before uncompressing more input.
111   DCHECK_EQ(avail_out_, 0);
112 
113   // Output buffer must be large enough to fit the uncompressed block.
114   DCHECK_GE(output_buffer_capacity_, uncompressed_length);
115   next_out_ = output_buffer_.get();
116 
117   bool status = port::Snappy_Uncompress(next_in_, compressed_block_length,
118                                         output_buffer_.get());
119   if (!status) {
120     return errors::DataLoss("Snappy_Uncompress failed");
121   }
122   next_in_ += compressed_block_length;
123   avail_in_ -= compressed_block_length;
124   avail_out_ += uncompressed_length;
125   return Status::OK();
126 }
127 
ReadCompressedBlockLength(uint32 * length)128 Status SnappyInputBuffer::ReadCompressedBlockLength(uint32* length) {
129   *length = 0;
130   size_t bytes_to_read = 4;
131   while (bytes_to_read > 0) {
132     if (avail_in_ == 0) {
133       TF_RETURN_IF_ERROR(ReadFromFile());
134     }
135     size_t readable = std::min(bytes_to_read, avail_in_);
136 
137     for (size_t i = 0; i < readable; i++) {
138       // The "unsigned char" type cast is intentional to avoid implicit type
139       // casting of the signed char to unsigned int during bitwise OR which
140       // causes weird overflow errors.
141       *length = (*length << 8) | static_cast<unsigned char>(next_in_[0]);
142       bytes_to_read--;
143       next_in_++;
144       avail_in_--;
145     }
146   }
147   return Status::OK();
148 }
149 
ReadFromFile()150 Status SnappyInputBuffer::ReadFromFile() {
151   int bytes_to_read = input_buffer_capacity_;
152   char* read_location = reinterpret_cast<char*>(input_buffer_.get());
153 
154   // If there are unread bytes in the input stream we move them to the head
155   // of the stream to maximize the space available to read new data into.
156   // TODO(srbs): A circular buffer would be useful here.
157   if (avail_in_ > 0) {
158     size_t read_bytes = next_in_ - input_buffer_.get();
159     // Remove `read_bytes` from the head of the input stream.
160     // Move unread bytes to the head of the input stream.
161     if (read_bytes > 0) {
162       memmove(input_buffer_.get(), next_in_, avail_in_);
163     }
164 
165     bytes_to_read -= avail_in_;
166     read_location += avail_in_;
167   }
168   StringPiece data;
169   // Try to read enough data to fill up input_buffer_.
170   Status s = file_->Read(file_pos_, bytes_to_read, &data, read_location);
171   if (data.data() != read_location) {
172     memmove(read_location, data.data(), data.size());
173   }
174 
175   // Since we moved unread data to the head of the input stream we can point
176   // next_in to the head of the input stream.
177   next_in_ = input_buffer_.get();
178 
179   // Note: data.size() could be different from bytes_to_read.
180   avail_in_ += data.size();
181   file_pos_ += data.size();
182 
183   if (!s.ok() && !errors::IsOutOfRange(s)) {
184     return s;
185   }
186 
187   // We throw OutOfRange error iff no new data has been read from file.
188   // Since we never check how much data is remaining in the file, it is
189   // possible that on the last read there isn't enough data in the file to
190   // fill up the buffer in which case file_->ReadNBytes would return an
191   // OutOfRange error.
192   if (data.empty()) {
193     return errors::OutOfRange("EOF reached");
194   }
195   if (errors::IsOutOfRange(s)) {
196     return Status::OK();
197   }
198 
199   return s;
200 }
201 
202 }  // namespace io
203 }  // namespace tensorflow
204