1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_LIB_IO_INPUTBUFFER_H_
17 #define TENSORFLOW_LIB_IO_INPUTBUFFER_H_
18 
19 #include <string>
20 
21 #include "tensorflow/core/lib/core/coding.h"
22 #include "tensorflow/core/lib/core/status.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/macros.h"
25 #include "tensorflow/core/platform/types.h"
26 
27 namespace tensorflow {
28 namespace io {
29 
30 // An InputBuffer provides a buffer on top of a RandomAccessFile.
31 // A given instance of an InputBuffer is NOT safe for concurrent use
32 // by multiple threads
33 class InputBuffer {
34  public:
35   // Create an InputBuffer for "file" with a buffer size of
36   // "buffer_bytes" bytes.  'file' must outlive *this.
37   InputBuffer(RandomAccessFile* file, size_t buffer_bytes);
38   ~InputBuffer();
39 
40   // Read one text line of data into "*result" until end-of-file or a
41   // \n is read.  (The \n is not included in the result.)  Overwrites
42   // any existing data in *result.
43   //
44   // If successful, returns OK.  If we are already at the end of the
45   // file, we return an OUT_OF_RANGE error.  Otherwise, we return
46   // some other non-OK status.
47   template <typename T>
48   Status ReadLine(T* result);
49 
50   // Reads bytes_to_read bytes into *result, overwriting *result.
51   //
52   // If successful, returns OK.  If we there are not enough bytes to
53   // read before the end of the file, we return an OUT_OF_RANGE error.
54   // Otherwise, we return some other non-OK status.
55   Status ReadNBytes(int64 bytes_to_read, string* result);
56 
57   // An overload that writes to char*.  Caller must ensure result[0,
58   // bytes_to_read) is valid to be overwritten.  Returns OK iff "*bytes_read ==
59   // bytes_to_read".
60   Status ReadNBytes(int64 bytes_to_read, char* result, size_t* bytes_read);
61 
62   // Reads a single varint32.
63   Status ReadVarint32(uint32* result);
64 
65   // Reads a single varint64.
66   Status ReadVarint64(uint64* result);
67 
68   // Like ReadNBytes() without returning the bytes read.
69   Status SkipNBytes(int64 bytes_to_skip);
70 
71   // Seek to this offset within the file.
72   //
73   // If we seek to somewhere within our pre-buffered data, we will re-use what
74   // data we can.  Otherwise, Seek() throws out the current buffer and the next
75   // read will trigger a File::Read().
76   Status Seek(int64 position);
77 
78   // Provides a hint about future reads, which may improve their performance.
79   Status Hint(int64 bytes_to_read);
80 
81   // Returns the position in the file.
Tell()82   int64 Tell() const { return file_pos_ - (limit_ - pos_); }
83 
84   // Returns the underlying RandomAccessFile.
file()85   RandomAccessFile* file() const { return file_; }
86 
87  private:
88   Status FillBuffer();
89 
90   // Internal slow-path routine used by ReadVarint32().
91   Status ReadVarint32Fallback(uint32* result);
92 
93   // Internal slow-path routine used by ReadVarint64().
94   Status ReadVarint64Fallback(uint64* result);
95 
96   // Helper method for reading a varint which can span at max `max_bytes`.
97   // If the varint is longer, a DataLoss error status is returned.
98   // If end of file is reached while reading, OutOfRange error is returned.
99   template <typename T>
100   Status ReadVarintFallback(T* result, int max_bytes);
101 
102   RandomAccessFile* file_;  // Not owned
103   int64 file_pos_;          // Next position to read from in "file_"
104   size_t size_;             // Size of "buf_"
105   char* buf_;               // The buffer itself
106   // [pos_,limit_) hold the "limit_ - pos_" bytes just before "file_pos_"
107   char* pos_;    // Current position in "buf"
108   char* limit_;  // Just past end of valid data in "buf"
109 
110   TF_DISALLOW_COPY_AND_ASSIGN(InputBuffer);
111 };
112 
113 // Implementation details.
114 
115 // Explicit instantiations defined in inputbuffer.cc.
116 extern template Status InputBuffer::ReadLine<string>(string* result);
117 extern template Status InputBuffer::ReadLine<tstring>(tstring* result);
118 
119 // Inlined for performance.
ReadVarint32(uint32 * result)120 inline Status InputBuffer::ReadVarint32(uint32* result) {
121   if (pos_ + core::kMaxVarint32Bytes <= limit_) {
122     // Fast path: directly parse from buffered data.
123     // Reads strictly from the range [pos_, limit_).
124     const char* offset = core::GetVarint32Ptr(pos_, limit_, result);
125     if (offset == nullptr) return errors::OutOfRange("Parsed past limit.");
126     pos_ = const_cast<char*>(offset);
127     return Status::OK();
128   } else {
129     return ReadVarint32Fallback(result);
130   }
131 }
132 
133 // Inlined for performance.
ReadVarint64(uint64 * result)134 inline Status InputBuffer::ReadVarint64(uint64* result) {
135   if (pos_ + core::kMaxVarint64Bytes <= limit_) {
136     // Fast path: directly parse from buffered data.
137     // Reads strictly from the range [pos_, limit_).
138     const char* offset = core::GetVarint64Ptr(pos_, limit_, result);
139     if (offset == nullptr) return errors::OutOfRange("Parsed past limit.");
140     pos_ = const_cast<char*>(offset);
141     return Status::OK();
142   } else {
143     return ReadVarint64Fallback(result);
144   }
145 }
146 
147 }  // namespace io
148 }  // namespace tensorflow
149 
150 #endif  // TENSORFLOW_LIB_IO_INPUTBUFFER_H_
151