1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_LIB_IO_COMPRESSED_OUTPUTBUFFER_H_
17 #define TENSORFLOW_CORE_LIB_IO_COMPRESSED_OUTPUTBUFFER_H_
18 
19 #include <zlib.h>
20 
21 #include <string>
22 
23 #include "tensorflow/core/lib/core/status.h"
24 #include "tensorflow/core/lib/core/stringpiece.h"
25 #include "tensorflow/core/lib/io/zlib_compression_options.h"
26 #include "tensorflow/core/platform/env.h"
27 #include "tensorflow/core/platform/file_system.h"
28 #include "tensorflow/core/platform/macros.h"
29 #include "tensorflow/core/platform/types.h"
30 
31 namespace tensorflow {
32 namespace io {
33 
34 // Provides support for writing compressed output to file using zlib
35 // (http://www.zlib.net/).
36 // A given instance of an ZlibOutputBuffer is NOT safe for concurrent use
37 // by multiple threads
38 class ZlibOutputBuffer : public WritableFile {
39  public:
40   // Create an ZlibOutputBuffer for `file` with two buffers that cache the
41   // 1. input data to be deflated
42   // 2. the deflated output
43   // with sizes `input_buffer_bytes` and `output_buffer_bytes` respectively.
44   // Does not take ownership of `file`.
45   // output_buffer_bytes should be greater than 1.
46   ZlibOutputBuffer(
47       WritableFile* file,
48       int32 input_buffer_bytes,   // size of z_stream.next_in buffer
49       int32 output_buffer_bytes,  // size of z_stream.next_out buffer
50       const ZlibCompressionOptions& zlib_options);
51 
52   ~ZlibOutputBuffer();
53 
54   // Initializes some state necessary for the output buffer. This call is
55   // required before any other operation on the buffer.
56   Status Init();
57 
58   // Adds `data` to the compression pipeline.
59   //
60   // The input data is buffered in `z_stream_input_` and is compressed in bulk
61   // when the buffer gets full. The compressed output is not immediately
62   // written to file but rather buffered in `z_stream_output_` and gets written
63   // to file when the buffer is full.
64   //
65   // To immediately write contents to file call `Flush()`.
66   Status Append(StringPiece data) override;
67 
68 #if defined(TF_CORD_SUPPORT)
69   Status Append(const absl::Cord& cord) override;
70 #endif
71 
72   // Deflates any cached input and writes all output to file.
73   Status Flush() override;
74 
75   // Compresses any cached input and writes all output to file. This must be
76   // called before the destructor to avoid any data loss.
77   //
78   // Contrary to `Flush()` this informs zlib that it should not expect any
79   // further input by using Z_FINISH flush mode. Also cleans up z_stream.
80   //
81   // After calling this, any further calls to `Write()`, `Flush()` or `Close()`
82   // will fail.
83   Status Close() override;
84 
85   // Returns the name of the underlying file.
86   Status Name(StringPiece* result) const override;
87 
88   // Deflates any cached input, writes all output to file and syncs it.
89   Status Sync() override;
90 
91   // Returns the write position in the underlying file. The position does not
92   // reflect buffered, un-flushed data.
93   Status Tell(int64* position) override;
94 
95  private:
96   WritableFile* file_;  // Not owned
97   Status init_status_;
98   size_t input_buffer_capacity_;
99   size_t output_buffer_capacity_;
100 
101   // Buffer for storing contents read from input `file_`.
102   // TODO(srbs): Consider using circular buffers. That would greatly simplify
103   // the implementation.
104   std::unique_ptr<Bytef[]> z_stream_input_;
105 
106   // Buffer for storing deflated contents of `file_`.
107   std::unique_ptr<Bytef[]> z_stream_output_;
108 
109   ZlibCompressionOptions const zlib_options_;
110 
111   // Configuration passed to `deflate`.
112   //
113   // z_stream_->next_in:
114   //   Next byte to compress. Points to some byte in z_stream_input_ buffer.
115   // z_stream_->avail_in:
116   //   Number of bytes available to be compressed at this time.
117   // z_stream_->next_out:
118   //   Next byte to write compressed data to. Points to some byte in
119   //   z_stream_output_ buffer.
120   // z_stream_->avail_out:
121   //   Number of free bytes available at write location.
122   std::unique_ptr<z_stream> z_stream_;
123 
124   // Adds `data` to `z_stream_input_`.
125   // Throws if `data.size()` > AvailableInputSpace().
126   void AddToInputBuffer(StringPiece data);
127 
128   // Returns the total space available in z_input_stream_ buffer.
129   int32 AvailableInputSpace() const;
130 
131   // Deflate contents in z_stream_input_ and store results in z_stream_output_.
132   // The contents of output stream are written to file if more space is needed.
133   // On successful termination it is assured that:
134   // - z_stream_->avail_in == 0
135   // - z_stream_->avail_out > 0
136   //
137   // Note: This method does not flush contents to file.
138   // Returns non-ok status if writing contents to file fails.
139   Status DeflateBuffered(int flush_mode);
140 
141   // Appends contents of `z_stream_output_` to `file_`.
142   // Returns non-OK status if writing to file fails.
143   Status FlushOutputBufferToFile();
144 
145   // Calls `deflate()` and returns DataLoss Status if it failed.
146   Status Deflate(int flush);
147 
IsSyncOrFullFlush(uint8 flush_mode)148   static bool IsSyncOrFullFlush(uint8 flush_mode) {
149     return flush_mode == Z_SYNC_FLUSH || flush_mode == Z_FULL_FLUSH;
150   }
151 
152   TF_DISALLOW_COPY_AND_ASSIGN(ZlibOutputBuffer);
153 };
154 
155 }  // namespace io
156 }  // namespace tensorflow
157 
158 #endif  // TENSORFLOW_CORE_LIB_IO_COMPRESSED_OUTPUTBUFFER_H_
159