1# Copyright 2014 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Helper class for streaming resumable uploads.""" 15 16import collections 17import os 18 19from gslib.exception import CommandException 20from gslib.util import GetJsonResumableChunkSize 21 22 23class ResumableStreamingJsonUploadWrapper(object): 24 """Wraps an input stream in a buffer for resumable uploads. 25 26 This class takes a non-seekable input stream, buffers it, and exposes it 27 as a stream with limited seek capabilities such that it can be used in a 28 resumable JSON API upload. 29 30 max_buffer_size bytes of buffering is supported. 31 """ 32 33 def __init__(self, stream, max_buffer_size, test_small_buffer=False): 34 """Initializes the wrapper. 35 36 Args: 37 stream: Input stream. 38 max_buffer_size: Maximum size of internal buffer; should be >= the chunk 39 size of the resumable upload API to ensure that at least one full 40 chunk write can be replayed in the event of a server error. 41 test_small_buffer: Skip check for buffer size vs. chunk size, for testing. 42 """ 43 self._orig_fp = stream 44 45 if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize(): 46 raise CommandException('Resumable streaming upload created with buffer ' 47 'size %s, JSON resumable upload chunk size %s. ' 48 'Buffer size must be >= JSON resumable upload ' 49 'chunk size to ensure that uploads can be ' 50 'resumed.' % (max_buffer_size, 51 GetJsonResumableChunkSize())) 52 53 self._max_buffer_size = max_buffer_size 54 self._buffer = collections.deque() 55 self._buffer_start = 0 56 self._buffer_end = 0 57 self._position = 0 58 59 def read(self, size=-1): # pylint: disable=invalid-name 60 """"Reads from the wrapped stream. 61 62 Args: 63 size: The amount of bytes to read. If omitted or negative, the entire 64 contents of the stream will be read and returned. 65 66 Returns: 67 Bytes from the wrapped stream. 68 """ 69 read_all_bytes = size is None or size < 0 70 if read_all_bytes: 71 bytes_remaining = self._max_buffer_size 72 else: 73 bytes_remaining = size 74 data = b'' 75 buffered_data = [] 76 if self._position < self._buffer_end: 77 # There was a backwards seek, so read from the buffer first. 78 79 # TODO: Performance test to validate if it is worth re-aligning 80 # the buffers in this case. Also, seeking through the buffer for 81 # each read on a long catch-up is probably not performant, but we'd 82 # need a more complex data structure than a deque to get around this. 83 pos_in_buffer = self._buffer_start 84 buffer_index = 0 85 # First, find the start position in the buffer. 86 while pos_in_buffer + len(self._buffer[buffer_index]) < self._position: 87 # When this loop exits, buffer_index will refer to a buffer that 88 # has at least some overlap with self._position, and 89 # pos_in_buffer will be >= self._position 90 pos_in_buffer += len(self._buffer[buffer_index]) 91 buffer_index += 1 92 93 # Read until we've read enough or we're out of buffer. 94 while pos_in_buffer < self._buffer_end and bytes_remaining > 0: 95 buffer_len = len(self._buffer[buffer_index]) 96 # This describes how far into the current buffer self._position is. 97 offset_from_position = self._position - pos_in_buffer 98 bytes_available_this_buffer = buffer_len - offset_from_position 99 read_size = min(bytes_available_this_buffer, bytes_remaining) 100 buffered_data.append( 101 self._buffer[buffer_index] 102 [offset_from_position:offset_from_position + read_size]) 103 bytes_remaining -= read_size 104 pos_in_buffer += buffer_len 105 buffer_index += 1 106 self._position += read_size 107 108 # At this point we're guaranteed that if there are any bytes left to read, 109 # then self._position == self._buffer_end, and we can read from the 110 # wrapped stream if needed. 111 if read_all_bytes: 112 # TODO: The user is requesting reading until the end of an 113 # arbitrary length stream, which is bad we'll need to return data 114 # with no size limits; if the stream is sufficiently long, we could run 115 # out of memory. We could break this down into smaller reads and 116 # buffer it as we go, but we're still left returning the data all at 117 # once to the caller. We could raise, but for now trust the caller to 118 # be sane and have enough memory to hold the remaining stream contents. 119 new_data = self._orig_fp.read(size) 120 data_len = len(new_data) 121 if not buffered_data: 122 data = new_data 123 else: 124 buffered_data.append(new_data) 125 data = b''.join(buffered_data) 126 self._position += data_len 127 elif bytes_remaining: 128 new_data = self._orig_fp.read(bytes_remaining) 129 if not buffered_data: 130 data = new_data 131 else: 132 buffered_data.append(new_data) 133 data = b''.join(buffered_data) 134 data_len = len(new_data) 135 if data_len: 136 self._position += data_len 137 self._buffer.append(new_data) 138 self._buffer_end += data_len 139 oldest_data = None 140 while self._buffer_end - self._buffer_start > self._max_buffer_size: 141 oldest_data = self._buffer.popleft() 142 self._buffer_start += len(oldest_data) 143 if oldest_data: 144 refill_amount = self._max_buffer_size - (self._buffer_end - 145 self._buffer_start) 146 if refill_amount: 147 self._buffer.appendleft(oldest_data[-refill_amount:]) 148 self._buffer_start -= refill_amount 149 else: 150 data = b''.join(buffered_data) if buffered_data else b'' 151 152 return data 153 154 def tell(self): # pylint: disable=invalid-name 155 """Returns the current stream position.""" 156 return self._position 157 158 def seekable(self): # pylint: disable=invalid-name 159 """Returns true since limited seek support exists.""" 160 return True 161 162 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name 163 """Seeks on the buffered stream. 164 165 Args: 166 offset: The offset to seek to; must be within the buffer bounds. 167 whence: Must be os.SEEK_SET. 168 169 Raises: 170 CommandException if an unsupported seek mode or position is used. 171 """ 172 if whence == os.SEEK_SET: 173 if offset < self._buffer_start or offset > self._buffer_end: 174 raise CommandException('Unable to resume upload because of limited ' 175 'buffering available for streaming uploads. ' 176 'Offset %s was requested, but only data from ' 177 '%s to %s is buffered.' % 178 (offset, self._buffer_start, self._buffer_end)) 179 # Move to a position within the buffer. 180 self._position = offset 181 elif whence == os.SEEK_END: 182 if offset > self._max_buffer_size: 183 raise CommandException('Invalid SEEK_END offset %s on streaming ' 184 'upload. Only %s can be buffered.' % 185 (offset, self._max_buffer_size)) 186 # Read to the end and rely on buffering to handle the offset. 187 while self.read(self._max_buffer_size): 188 pass 189 # Now we're at the end. 190 self._position -= offset 191 else: 192 raise CommandException('Invalid seek mode on streaming upload. ' 193 '(mode %s, offset %s)' % (whence, offset)) 194 195 def close(self): # pylint: disable=invalid-name 196 return self._orig_fp.close() 197