1#!/usr/bin/env python
2#
3# Copyright 2017 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Compression support for apitools."""
18
19from collections import deque
20
21from apitools.base.py import gzip
22
23__all__ = [
24    'CompressStream',
25]
26
27
28# pylint: disable=invalid-name
29# Note: Apitools only uses the default chunksize when compressing.
30def CompressStream(in_stream, length=None, compresslevel=2,
31                   chunksize=16777216):
32
33    """Compresses an input stream into a file-like buffer.
34
35    This reads from the input stream until either we've stored at least length
36    compressed bytes, or the input stream has been exhausted.
37
38    This supports streams of unknown size.
39
40    Args:
41        in_stream: The input stream to read from.
42        length: The target number of compressed bytes to buffer in the output
43            stream. If length is none, the input stream will be compressed
44            until it's exhausted.
45
46            The actual length of the output buffer can vary from the target.
47            If the input stream is exhaused, the output buffer may be smaller
48            than expected. If the data is incompressible, the maximum length
49            can be exceeded by can be calculated to be:
50
51              chunksize + 5 * (floor((chunksize - 1) / 16383) + 1) + 17
52
53            This accounts for additional header data gzip adds. For the default
54            16MiB chunksize, this results in the max size of the output buffer
55            being:
56
57              length + 16Mib + 5142 bytes
58
59        compresslevel: Optional, defaults to 2. The desired compression level.
60        chunksize: Optional, defaults to 16MiB. The chunk size used when
61            reading data from the input stream to write into the output
62            buffer.
63
64    Returns:
65        A file-like output buffer of compressed bytes, the number of bytes read
66        from the input stream, and a flag denoting if the input stream was
67        exhausted.
68    """
69    in_read = 0
70    in_exhausted = False
71    out_stream = StreamingBuffer()
72    with gzip.GzipFile(mode='wb',
73                       fileobj=out_stream,
74                       compresslevel=compresslevel) as compress_stream:
75        # Read until we've written at least length bytes to the output stream.
76        while not length or out_stream.length < length:
77            data = in_stream.read(chunksize)
78            data_length = len(data)
79            compress_stream.write(data)
80            in_read += data_length
81            # If we read less than requested, the stream is exhausted.
82            if data_length < chunksize:
83                in_exhausted = True
84                break
85    return out_stream, in_read, in_exhausted
86
87
88class StreamingBuffer(object):
89
90    """Provides a file-like object that writes to a temporary buffer.
91
92    When data is read from the buffer, it is permanently removed. This is
93    useful when there are memory constraints preventing the entire buffer from
94    being stored in memory.
95    """
96
97    def __init__(self):
98        # The buffer of byte arrays.
99        self.__buf = deque()
100        # The number of bytes in __buf.
101        self.__size = 0
102
103    def __len__(self):
104        return self.__size
105
106    def __nonzero__(self):
107        # For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid
108        # accidental len() calls from httplib in the form of "if this_object:".
109        return bool(self.__size)
110
111    @property
112    def length(self):
113        # For 32-bit python2.x, len() cannot exceed a 32-bit number.
114        return self.__size
115
116    def write(self, data):
117        # Gzip can write many 0 byte chunks for highly compressible data.
118        # Prevent them from being added internally.
119        if data is not None and data:
120            self.__buf.append(data)
121            self.__size += len(data)
122
123    def read(self, size=None):
124        """Read at most size bytes from this buffer.
125
126        Bytes read from this buffer are consumed and are permanently removed.
127
128        Args:
129          size: If provided, read no more than size bytes from the buffer.
130            Otherwise, this reads the entire buffer.
131
132        Returns:
133          The bytes read from this buffer.
134        """
135        if size is None:
136            size = self.__size
137        ret_list = []
138        while size > 0 and self.__buf:
139            data = self.__buf.popleft()
140            size -= len(data)
141            ret_list.append(data)
142        if size < 0:
143            ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
144            self.__buf.appendleft(remainder)
145        ret = b''.join(ret_list)
146        self.__size -= len(ret)
147        return ret
148