1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/lib/io/buffered_inputstream.h"
17 
18 #include "tensorflow/core/lib/io/random_inputstream.h"
19 
20 namespace tensorflow {
21 namespace io {
22 
BufferedInputStream(InputStreamInterface * input_stream,size_t buffer_bytes,bool owns_input_stream)23 BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream,
24                                          size_t buffer_bytes,
25                                          bool owns_input_stream)
26     : input_stream_(input_stream),
27       size_(buffer_bytes),
28       owns_input_stream_(owns_input_stream) {
29   buf_.reserve(size_);
30 }
31 
BufferedInputStream(RandomAccessFile * file,size_t buffer_bytes)32 BufferedInputStream::BufferedInputStream(RandomAccessFile* file,
33                                          size_t buffer_bytes)
34     : BufferedInputStream(new RandomAccessInputStream(file), buffer_bytes,
35                           true) {}
36 
~BufferedInputStream()37 BufferedInputStream::~BufferedInputStream() {
38   if (owns_input_stream_) {
39     delete input_stream_;
40   }
41 }
42 
FillBuffer()43 Status BufferedInputStream::FillBuffer() {
44   if (!file_status_.ok()) {
45     pos_ = 0;
46     limit_ = 0;
47     return file_status_;
48   }
49   Status s = input_stream_->ReadNBytes(size_, &buf_);
50   pos_ = 0;
51   limit_ = buf_.size();
52   if (!s.ok()) {
53     file_status_ = s;
54   }
55   return s;
56 }
57 
58 template <typename StringType>
ReadLineHelper(StringType * result,bool include_eol)59 Status BufferedInputStream::ReadLineHelper(StringType* result,
60                                            bool include_eol) {
61   result->clear();
62   Status s;
63   while (true) {
64     if (pos_ == limit_) {
65       // Get more data into buffer
66       s = FillBuffer();
67       if (limit_ == 0) {
68         break;
69       }
70     }
71     char c = buf_[pos_++];
72     if (c == '\n') {
73       if (include_eol) {
74         result->append(1, c);
75       }
76       return Status::OK();
77     }
78     // We don't append '\r' to *result
79     if (c != '\r') {
80       result->append(1, c);
81     }
82   }
83   if (errors::IsOutOfRange(s) && !result->empty()) {
84     return Status::OK();
85   }
86   return s;
87 }
88 
ReadNBytes(int64 bytes_to_read,tstring * result)89 Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, tstring* result) {
90   if (bytes_to_read < 0) {
91     return errors::InvalidArgument("Can't read a negative number of bytes: ",
92                                    bytes_to_read);
93   }
94   result->clear();
95   if (pos_ == limit_ && !file_status_.ok() && bytes_to_read > 0) {
96     return file_status_;
97   }
98   result->reserve(bytes_to_read);
99 
100   Status s;
101   while (result->size() < static_cast<size_t>(bytes_to_read)) {
102     // Check whether the buffer is fully read or not.
103     if (pos_ == limit_) {
104       s = FillBuffer();
105       // If we didn't read any bytes, we're at the end of the file; break out.
106       if (limit_ == 0) {
107         DCHECK(!s.ok());
108         file_status_ = s;
109         break;
110       }
111     }
112     const int64 bytes_to_copy =
113         std::min<int64>(limit_ - pos_, bytes_to_read - result->size());
114     result->insert(result->size(), buf_, pos_, bytes_to_copy);
115     pos_ += bytes_to_copy;
116   }
117   // Filling the buffer might lead to a situation when we go past the end of
118   // the file leading to an OutOfRange() status return. But we might have
119   // obtained enough data to satisfy the function call. Returning OK then.
120   if (errors::IsOutOfRange(s) &&
121       (result->size() == static_cast<size_t>(bytes_to_read))) {
122     return Status::OK();
123   }
124   return s;
125 }
126 
SkipNBytes(int64 bytes_to_skip)127 Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) {
128   if (bytes_to_skip < 0) {
129     return errors::InvalidArgument("Can only skip forward, not ",
130                                    bytes_to_skip);
131   }
132   if (pos_ + bytes_to_skip < limit_) {
133     // If we aren't skipping too much, then we can just move pos_;
134     pos_ += bytes_to_skip;
135   } else {
136     // Otherwise, we already have read limit_ - pos_, so skip the rest. At this
137     // point we need to get fresh data into the buffer, so reset pos_ and
138     // limit_.
139     Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_));
140     pos_ = 0;
141     limit_ = 0;
142     if (errors::IsOutOfRange(s)) {
143       file_status_ = s;
144     }
145     return s;
146   }
147   return Status::OK();
148 }
149 
Tell() const150 int64 BufferedInputStream::Tell() const {
151   return input_stream_->Tell() - (limit_ - pos_);
152 }
153 
Seek(int64 position)154 Status BufferedInputStream::Seek(int64 position) {
155   if (position < 0) {
156     return errors::InvalidArgument("Seeking to a negative position: ",
157                                    position);
158   }
159 
160   // Position of the buffer's lower limit within file.
161   const int64 buf_lower_limit = input_stream_->Tell() - limit_;
162   if (position < buf_lower_limit) {
163     // Seek before buffer, reset input stream and skip 'position' bytes.
164     TF_RETURN_IF_ERROR(Reset());
165     return SkipNBytes(position);
166   }
167 
168   if (position < Tell()) {
169     // Seek within buffer before 'pos_'
170     pos_ -= Tell() - position;
171     return Status::OK();
172   }
173 
174   // Seek after 'pos_'
175   return SkipNBytes(position - Tell());
176 }
177 
178 template <typename T>
ReadAll(T * result)179 Status BufferedInputStream::ReadAll(T* result) {
180   result->clear();
181   Status status;
182   while (status.ok()) {
183     status = FillBuffer();
184     if (limit_ == 0) {
185       break;
186     }
187     result->append(buf_);
188     pos_ = limit_;
189   }
190 
191   if (errors::IsOutOfRange(status)) {
192     file_status_ = status;
193     return Status::OK();
194   }
195   return status;
196 }
197 
198 template Status BufferedInputStream::ReadAll<string>(string* result);
199 template Status BufferedInputStream::ReadAll<tstring>(tstring* result);
200 
Reset()201 Status BufferedInputStream::Reset() {
202   TF_RETURN_IF_ERROR(input_stream_->Reset());
203   pos_ = 0;
204   limit_ = 0;
205   file_status_ = Status::OK();
206   return Status::OK();
207 }
208 
ReadLine(string * result)209 Status BufferedInputStream::ReadLine(string* result) {
210   return ReadLineHelper(result, false);
211 }
212 
ReadLine(tstring * result)213 Status BufferedInputStream::ReadLine(tstring* result) {
214   return ReadLineHelper(result, false);
215 }
216 
ReadLineAsString()217 string BufferedInputStream::ReadLineAsString() {
218   string result;
219   ReadLineHelper(&result, true).IgnoreError();
220   return result;
221 }
222 
223 }  // namespace io
224 }  // namespace tensorflow
225