1#!/usr/bin/env python 2# 3# Copyright 2017 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Compression support for apitools.""" 18 19from collections import deque 20 21from apitools.base.py import gzip 22 23__all__ = [ 24 'CompressStream', 25] 26 27 28# pylint: disable=invalid-name 29# Note: Apitools only uses the default chunksize when compressing. 30def CompressStream(in_stream, length=None, compresslevel=2, 31 chunksize=16777216): 32 33 """Compresses an input stream into a file-like buffer. 34 35 This reads from the input stream until either we've stored at least length 36 compressed bytes, or the input stream has been exhausted. 37 38 This supports streams of unknown size. 39 40 Args: 41 in_stream: The input stream to read from. 42 length: The target number of compressed bytes to buffer in the output 43 stream. If length is none, the input stream will be compressed 44 until it's exhausted. 45 46 The actual length of the output buffer can vary from the target. 47 If the input stream is exhaused, the output buffer may be smaller 48 than expected. If the data is incompressible, the maximum length 49 can be exceeded by can be calculated to be: 50 51 chunksize + 5 * (floor((chunksize - 1) / 16383) + 1) + 17 52 53 This accounts for additional header data gzip adds. For the default 54 16MiB chunksize, this results in the max size of the output buffer 55 being: 56 57 length + 16Mib + 5142 bytes 58 59 compresslevel: Optional, defaults to 2. The desired compression level. 60 chunksize: Optional, defaults to 16MiB. The chunk size used when 61 reading data from the input stream to write into the output 62 buffer. 63 64 Returns: 65 A file-like output buffer of compressed bytes, the number of bytes read 66 from the input stream, and a flag denoting if the input stream was 67 exhausted. 68 """ 69 in_read = 0 70 in_exhausted = False 71 out_stream = StreamingBuffer() 72 with gzip.GzipFile(mode='wb', 73 fileobj=out_stream, 74 compresslevel=compresslevel) as compress_stream: 75 # Read until we've written at least length bytes to the output stream. 76 while not length or out_stream.length < length: 77 data = in_stream.read(chunksize) 78 data_length = len(data) 79 compress_stream.write(data) 80 in_read += data_length 81 # If we read less than requested, the stream is exhausted. 82 if data_length < chunksize: 83 in_exhausted = True 84 break 85 return out_stream, in_read, in_exhausted 86 87 88class StreamingBuffer(object): 89 90 """Provides a file-like object that writes to a temporary buffer. 91 92 When data is read from the buffer, it is permanently removed. This is 93 useful when there are memory constraints preventing the entire buffer from 94 being stored in memory. 95 """ 96 97 def __init__(self): 98 # The buffer of byte arrays. 99 self.__buf = deque() 100 # The number of bytes in __buf. 101 self.__size = 0 102 103 def __len__(self): 104 return self.__size 105 106 def __nonzero__(self): 107 # For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid 108 # accidental len() calls from httplib in the form of "if this_object:". 109 return bool(self.__size) 110 111 @property 112 def length(self): 113 # For 32-bit python2.x, len() cannot exceed a 32-bit number. 114 return self.__size 115 116 def write(self, data): 117 # Gzip can write many 0 byte chunks for highly compressible data. 118 # Prevent them from being added internally. 119 if data is not None and data: 120 self.__buf.append(data) 121 self.__size += len(data) 122 123 def read(self, size=None): 124 """Read at most size bytes from this buffer. 125 126 Bytes read from this buffer are consumed and are permanently removed. 127 128 Args: 129 size: If provided, read no more than size bytes from the buffer. 130 Otherwise, this reads the entire buffer. 131 132 Returns: 133 The bytes read from this buffer. 134 """ 135 if size is None: 136 size = self.__size 137 ret_list = [] 138 while size > 0 and self.__buf: 139 data = self.__buf.popleft() 140 size -= len(data) 141 ret_list.append(data) 142 if size < 0: 143 ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:] 144 self.__buf.appendleft(remainder) 145 ret = b''.join(ret_list) 146 self.__size -= len(ret) 147 return ret 148