1# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Helper class for streaming resumable uploads."""
15
16import collections
17import os
18
19from gslib.exception import CommandException
20from gslib.util import GetJsonResumableChunkSize
21
22
23class ResumableStreamingJsonUploadWrapper(object):
24  """Wraps an input stream in a buffer for resumable uploads.
25
26  This class takes a non-seekable input stream, buffers it, and exposes it
27  as a stream with limited seek capabilities such that it can be used in a
28  resumable JSON API upload.
29
30  max_buffer_size bytes of buffering is supported.
31  """
32
33  def __init__(self, stream, max_buffer_size, test_small_buffer=False):
34    """Initializes the wrapper.
35
36    Args:
37      stream: Input stream.
38      max_buffer_size: Maximum size of internal buffer; should be >= the chunk
39          size of the resumable upload API to ensure that at least one full
40          chunk write can be replayed in the event of a server error.
41      test_small_buffer: Skip check for buffer size vs. chunk size, for testing.
42    """
43    self._orig_fp = stream
44
45    if not test_small_buffer and max_buffer_size < GetJsonResumableChunkSize():
46      raise CommandException('Resumable streaming upload created with buffer '
47                             'size %s, JSON resumable upload chunk size %s. '
48                             'Buffer size must be >= JSON resumable upload '
49                             'chunk size to ensure that uploads can be '
50                             'resumed.' % (max_buffer_size,
51                                           GetJsonResumableChunkSize()))
52
53    self._max_buffer_size = max_buffer_size
54    self._buffer = collections.deque()
55    self._buffer_start = 0
56    self._buffer_end = 0
57    self._position = 0
58
59  def read(self, size=-1):  # pylint: disable=invalid-name
60    """"Reads from the wrapped stream.
61
62    Args:
63      size: The amount of bytes to read. If omitted or negative, the entire
64          contents of the stream will be read and returned.
65
66    Returns:
67      Bytes from the wrapped stream.
68    """
69    read_all_bytes = size is None or size < 0
70    if read_all_bytes:
71      bytes_remaining = self._max_buffer_size
72    else:
73      bytes_remaining = size
74    data = b''
75    buffered_data = []
76    if self._position < self._buffer_end:
77      # There was a backwards seek, so read from the buffer first.
78
79      # TODO: Performance test to validate if it is worth re-aligning
80      # the buffers in this case.  Also, seeking through the buffer for
81      # each read on a long catch-up is probably not performant, but we'd
82      # need a more complex data structure than a deque to get around this.
83      pos_in_buffer = self._buffer_start
84      buffer_index = 0
85      # First, find the start position in the buffer.
86      while pos_in_buffer + len(self._buffer[buffer_index]) < self._position:
87        # When this loop exits, buffer_index will refer to a buffer that
88        # has at least some overlap with self._position, and
89        # pos_in_buffer will be >= self._position
90        pos_in_buffer += len(self._buffer[buffer_index])
91        buffer_index += 1
92
93      # Read until we've read enough or we're out of buffer.
94      while pos_in_buffer < self._buffer_end and bytes_remaining > 0:
95        buffer_len = len(self._buffer[buffer_index])
96        # This describes how far into the current buffer self._position is.
97        offset_from_position = self._position - pos_in_buffer
98        bytes_available_this_buffer = buffer_len - offset_from_position
99        read_size = min(bytes_available_this_buffer, bytes_remaining)
100        buffered_data.append(
101            self._buffer[buffer_index]
102            [offset_from_position:offset_from_position + read_size])
103        bytes_remaining -= read_size
104        pos_in_buffer += buffer_len
105        buffer_index += 1
106        self._position += read_size
107
108    # At this point we're guaranteed that if there are any bytes left to read,
109    # then self._position == self._buffer_end, and we can read from the
110    # wrapped stream if needed.
111    if read_all_bytes:
112      # TODO: The user is requesting reading until the end of an
113      # arbitrary length stream, which is bad we'll need to return data
114      # with no size limits; if the stream is sufficiently long, we could run
115      # out of memory. We could break this down into smaller reads and
116      # buffer it as we go, but we're still left returning the data all at
117      # once to the caller.  We could raise, but for now trust the caller to
118      # be sane and have enough memory to hold the remaining stream contents.
119      new_data = self._orig_fp.read(size)
120      data_len = len(new_data)
121      if not buffered_data:
122        data = new_data
123      else:
124        buffered_data.append(new_data)
125        data = b''.join(buffered_data)
126      self._position += data_len
127    elif bytes_remaining:
128      new_data = self._orig_fp.read(bytes_remaining)
129      if not buffered_data:
130        data = new_data
131      else:
132        buffered_data.append(new_data)
133        data = b''.join(buffered_data)
134      data_len = len(new_data)
135      if data_len:
136        self._position += data_len
137        self._buffer.append(new_data)
138        self._buffer_end += data_len
139        oldest_data = None
140        while self._buffer_end - self._buffer_start > self._max_buffer_size:
141          oldest_data = self._buffer.popleft()
142          self._buffer_start += len(oldest_data)
143        if oldest_data:
144          refill_amount = self._max_buffer_size - (self._buffer_end -
145                                                   self._buffer_start)
146          if refill_amount:
147            self._buffer.appendleft(oldest_data[-refill_amount:])
148            self._buffer_start -= refill_amount
149    else:
150      data = b''.join(buffered_data) if buffered_data else b''
151
152    return data
153
154  def tell(self):  # pylint: disable=invalid-name
155    """Returns the current stream position."""
156    return self._position
157
158  def seekable(self):  # pylint: disable=invalid-name
159    """Returns true since limited seek support exists."""
160    return True
161
162  def seek(self, offset, whence=os.SEEK_SET):  # pylint: disable=invalid-name
163    """Seeks on the buffered stream.
164
165    Args:
166      offset: The offset to seek to; must be within the buffer bounds.
167      whence: Must be os.SEEK_SET.
168
169    Raises:
170      CommandException if an unsupported seek mode or position is used.
171    """
172    if whence == os.SEEK_SET:
173      if offset < self._buffer_start or offset > self._buffer_end:
174        raise CommandException('Unable to resume upload because of limited '
175                               'buffering available for streaming uploads. '
176                               'Offset %s was requested, but only data from '
177                               '%s to %s is buffered.' %
178                               (offset, self._buffer_start, self._buffer_end))
179      # Move to a position within the buffer.
180      self._position = offset
181    elif whence == os.SEEK_END:
182      if offset > self._max_buffer_size:
183        raise CommandException('Invalid SEEK_END offset %s on streaming '
184                               'upload. Only %s can be buffered.' %
185                               (offset, self._max_buffer_size))
186      # Read to the end and rely on buffering to handle the offset.
187      while self.read(self._max_buffer_size):
188        pass
189      # Now we're at the end.
190      self._position -= offset
191    else:
192      raise CommandException('Invalid seek mode on streaming upload. '
193                             '(mode %s, offset %s)' % (whence, offset))
194
195  def close(self):  # pylint: disable=invalid-name
196    return self._orig_fp.close()
197