1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_
17 #define TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_
18 
19 #include <string>
20 
21 #include "tensorflow/core/lib/core/status.h"
22 #include "tensorflow/core/platform/env.h"
23 #include "tensorflow/core/platform/macros.h"
24 #include "tensorflow/core/platform/platform.h"
25 #include "tensorflow/core/platform/snappy.h"
26 #include "tensorflow/core/platform/types.h"
27 
28 namespace tensorflow {
29 namespace io {
30 
31 // Compresses input data using Snappy (https://github.com/google/snappy) and
32 // writes to `file`.
33 //
34 // The input data is cached in a buffer of size `input_buffer_bytes`. When the
35 // buffer does not have enough available space to fit new data (in the call to
36 // `Write`), the contents of the buffer are compressed and sent to the output
37 // buffer.
38 //
39 // The compressed output is buffered in a buffer of size `output_buffer_bytes`
40 // which gets flushed to file when full.
41 //
42 // Output file format:
43 // The output file consists of a sequence of compressed blocks. Each block
44 // starts with a 4 byte header which stores the length (in bytes) of the
45 // _compressed_ block _excluding_ this header. The compressed
46 // block (excluding the 4 byte header) is a valid snappy block and can directly
47 // be uncompressed using Snappy_Uncompress.
48 class SnappyOutputBuffer : public WritableFile {
49  public:
50   // Create an SnappyOutputBuffer for `file` with two buffers that cache the
51   // 1. input data to be deflated
52   // 2. the deflated output
53   // with sizes `input_buffer_bytes` and `output_buffer_bytes` respectively.
54   // Does not take ownership of `file`.
55   SnappyOutputBuffer(WritableFile* file, int32 input_buffer_bytes,
56                      int32 output_buffer_bytes);
57 
58   // Per convention, the dtor does not call Flush() or Close(). We expect the
59   // caller to call those manually when done.
60   ~SnappyOutputBuffer() override;
61 
62   // Adds `data` to the compression pipeline.
63   //
64   // The input data is buffered internally and will be written to disk at a
65   // later time. To immediately write contents to file call `Flush()`.
66   Status Append(StringPiece data) override;
67 
68 #if defined(TF_CORD_SUPPORT)
69   Status Append(const absl::Cord& cord) override;
70 #endif
71 
72   // Compresses any buffered input and writes all output to file. This must be
73   // called before the destructor to avoid any data loss.
74   //
75   // Contrary to `Flush()` this informs snappy that it should not expect any
76   // further input.
77   //
78   // After calling this, any further calls to `Write()`, `Flush()` or `Close()`
79   // will fail.
80   Status Close() override;
81 
82   // Returns the name of the underlying file.
83   Status Name(StringPiece* result) const override;
84 
85   // Deflates any cached input, writes all output to file and syncs it.
86   Status Sync() override;
87 
88   // Returns the write position in the underlying file. The position does not
89   // reflect buffered, un-flushed data.
90   Status Tell(int64* position) override;
91 
92   // Adds `data` to the compression pipeline.
93   //
94   // The input data is buffered in `input_buffer_` and is compressed in bulk
95   // when the buffer gets full. The compressed output may not be immediately
96   // written to file but rather buffered in `output_buffer_` and gets written
97   // to file when the buffer is full.
98   //
99   // To immediately write contents to file call `Flush()`.
100   Status Write(StringPiece data);
101 
102   // Compresses any cached input and writes all output to file. This must be
103   // called before the destructor to avoid any data loss.
104   Status Flush();
105 
106  private:
107   // Appends `data` to `input_buffer_`.
108   // Throws if `data.size()` > AvailableInputSpace().
109   void AddToInputBuffer(StringPiece data);
110 
111   // Appends `data` to `output_buffer_`. Flushes buffer contents to file when
112   // buffer gets full.
113   Status AddToOutputBuffer(const char* data, size_t length);
114 
115   // Returns the total space available in `input_buffer_`.
116   int32 AvailableInputSpace() const;
117 
118   // Deflate contents in input_buffer_ and store results in output_buffer_.
119   // The contents of output stream are written to file if more space is needed.
120   //
121   // Note: This method does not flush contents to file.
122   // Returns non-ok status if writing contents to file fails.
123   Status DeflateBuffered();
124 
125   // Appends contents of `output_buffer_` to `file_`.
126   // Returns non-OK status if writing to file fails.
127   Status FlushOutputBufferToFile();
128 
129   // Compresses `avail_in_` bytes at `next_in_` location in `input_buffer_` and
130   // writes the results to output using `AddToOutputBuffer`.
131   // Returns non-OK status if writing to file failed.
132   Status Deflate();
133 
134   WritableFile* file_;  // Not owned
135 
136   // Buffer for storing contents read from input `file_`.
137   // TODO(srbs): Consider using circular buffers. That would greatly simplify
138   // the implementation.
139   std::unique_ptr<char[]> input_buffer_;
140   size_t input_buffer_capacity_;
141   char* next_in_;
142   size_t avail_in_ = 0;
143 
144   // Buffer for storing deflated contents of `file_`.
145   std::unique_ptr<char[]> output_buffer_;
146   size_t output_buffer_capacity_;
147   char* next_out_;
148   size_t avail_out_;
149 
150   TF_DISALLOW_COPY_AND_ASSIGN(SnappyOutputBuffer);
151 };
152 
153 }  // namespace io
154 }  // namespace tensorflow
155 
156 #endif  // TENSORFLOW_CORE_LIB_IO_SNAPPY_SNAPPY_OUTPUTBUFFER_H_
157