1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_INPUTSTREAM_H_
17 #define TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_INPUTSTREAM_H_
18 
19 #include "tensorflow/core/lib/io/inputstream_interface.h"
20 
21 namespace tensorflow {
22 namespace io {
23 
24 class SnappyInputStream : public InputStreamInterface {
25  public:
26   // Creates a SnappyInputStream for `input_stream`.
27   //
28   // Takes ownership  of `input_stream` iff `owns_input_stream` is true.
29   SnappyInputStream(InputStreamInterface* input_stream,
30                     size_t output_buffer_bytes, bool owns_input_stream);
31 
32   // Equivalent to the previous constructor with owns_input_stream = false.
33   explicit SnappyInputStream(InputStreamInterface* input_stream,
34                              size_t output_buffer_bytes);
35 
36   ~SnappyInputStream() override;
37 
38   // Reads bytes_to_read bytes into *result, overwriting *result.
39   //
40   // Return Status codes:
41   // OK:           If successful.
42   // OUT_OF_RANGE: If there are not enough bytes to read before
43   //               the end of the stream.
44   // ABORTED:      If inflate() fails, we return the error code with the
45   //               error message in `z_stream_->msg`.
46   // others:       If reading from stream failed.
47   Status ReadNBytes(int64 bytes_to_read, tstring* result) override;
48 
49 #if defined(TF_CORD_SUPPORT)
50   Status ReadNBytes(int64 bytes_to_read, absl::Cord* result) override;
51 #endif
52 
53   int64 Tell() const override;
54 
55   Status Reset() override;
56 
57  private:
58   // Decompress the next chunk of data and place the data into the cache.
59   Status Inflate();
60 
61   // Attempt to read `bytes_to_read` from the decompressed data cache. Returns
62   // the actual number of bytes read.
63   size_t ReadBytesFromCache(size_t bytes_to_read, char* result);
64 
65   InputStreamInterface* input_stream_;
66   const size_t output_buffer_bytes_;
67   const bool owns_input_stream_;
68 
69   // Specifies the number of decompressed bytes currently read.
70   int64 bytes_read_;
71 
72   // output_buffer_ contains decompressed data not yet read by the client.
73   std::unique_ptr<char[]> output_buffer_;
74 
75   // next_out_ points to the position in the `output_buffer_` that contains the
76   // next unread byte.
77   char* next_out_;
78 
79   // avail_out_ specifies the number of bytes left in the output_buffers_ that
80   // is not yet read.
81   size_t avail_out_;
82 
83   TF_DISALLOW_COPY_AND_ASSIGN(SnappyInputStream);
84 };
85 
86 }  // namespace io
87 }  // namespace tensorflow
88 
89 #endif  // TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_INPUTSTREAM_H_
90