1# Copyright 2012 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#    http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing,
10# software distributed under the License is distributed on an
11# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12# either express or implied. See the License for the specific
13# language governing permissions and limitations under the License.
14
15"""Python wrappers for the Google Storage RESTful API."""
16
17
18
19
20
21__all__ = ['ReadBuffer',
22           'StreamingBuffer',
23          ]
24
25import collections
26import os
27import urlparse
28
29from . import api_utils
30from . import common
31from . import errors
32from . import rest_api
33
34try:
35  from google.appengine.api import urlfetch
36  from google.appengine.ext import ndb
37except ImportError:
38  from google.appengine.api import urlfetch
39  from google.appengine.ext import ndb
40
41
42
43def _get_storage_api(retry_params, account_id=None):
44  """Returns storage_api instance for API methods.
45
46  Args:
47    retry_params: An instance of api_utils.RetryParams. If none,
48     thread's default will be used.
49    account_id: Internal-use only.
50
51  Returns:
52    A storage_api instance to handle urlfetch work to GCS.
53    On dev appserver, this instance by default will talk to a local stub
54    unless common.ACCESS_TOKEN is set. That token will be used to talk
55    to the real GCS.
56  """
57
58
59  api = _StorageApi(_StorageApi.full_control_scope,
60                    service_account_id=account_id,
61                    retry_params=retry_params)
62  if common.local_run() and not common.get_access_token():
63    api.api_url = common.local_api_url()
64  if common.get_access_token():
65    api.token = common.get_access_token()
66  return api
67
68
69class _StorageApi(rest_api._RestApi):
70  """A simple wrapper for the Google Storage RESTful API.
71
72  WARNING: Do NOT directly use this api. It's an implementation detail
73  and is subject to change at any release.
74
75  All async methods have similar args and returns.
76
77  Args:
78    path: The path to the Google Storage object or bucket, e.g.
79      '/mybucket/myfile' or '/mybucket'.
80    **kwd: Options for urlfetch. e.g.
81      headers={'content-type': 'text/plain'}, payload='blah'.
82
83  Returns:
84    A ndb Future. When fulfilled, future.get_result() should return
85    a tuple of (status, headers, content) that represents a HTTP response
86    of Google Cloud Storage XML API.
87  """
88
89  api_url = 'https://storage.googleapis.com'
90  read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only'
91  read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write'
92  full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
93
94  def __getstate__(self):
95    """Store state as part of serialization/pickling.
96
97    Returns:
98      A tuple (of dictionaries) with the state of this object
99    """
100    return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url})
101
102  def __setstate__(self, state):
103    """Restore state as part of deserialization/unpickling.
104
105    Args:
106      state: the tuple from a __getstate__ call
107    """
108    superstate, localstate = state
109    super(_StorageApi, self).__setstate__(superstate)
110    self.api_url = localstate['api_url']
111
112  @api_utils._eager_tasklet
113  @ndb.tasklet
114  def do_request_async(self, url, method='GET', headers=None, payload=None,
115                       deadline=None, callback=None):
116    """Inherit docs.
117
118    This method translates urlfetch exceptions to more service specific ones.
119    """
120    if headers is None:
121      headers = {}
122    if 'x-goog-api-version' not in headers:
123      headers['x-goog-api-version'] = '2'
124    headers['accept-encoding'] = 'gzip, *'
125    try:
126      resp_tuple = yield super(_StorageApi, self).do_request_async(
127          url, method=method, headers=headers, payload=payload,
128          deadline=deadline, callback=callback)
129    except urlfetch.DownloadError, e:
130      raise errors.TimeoutError(
131          'Request to Google Cloud Storage timed out.', e)
132
133    raise ndb.Return(resp_tuple)
134
135
136  def post_object_async(self, path, **kwds):
137    """POST to an object."""
138    return self.do_request_async(self.api_url + path, 'POST', **kwds)
139
140  def put_object_async(self, path, **kwds):
141    """PUT an object."""
142    return self.do_request_async(self.api_url + path, 'PUT', **kwds)
143
144  def get_object_async(self, path, **kwds):
145    """GET an object.
146
147    Note: No payload argument is supported.
148    """
149    return self.do_request_async(self.api_url + path, 'GET', **kwds)
150
151  def delete_object_async(self, path, **kwds):
152    """DELETE an object.
153
154    Note: No payload argument is supported.
155    """
156    return self.do_request_async(self.api_url + path, 'DELETE', **kwds)
157
158  def head_object_async(self, path, **kwds):
159    """HEAD an object.
160
161    Depending on request headers, HEAD returns various object properties,
162    e.g. Content-Length, Last-Modified, and ETag.
163
164    Note: No payload argument is supported.
165    """
166    return self.do_request_async(self.api_url + path, 'HEAD', **kwds)
167
168  def get_bucket_async(self, path, **kwds):
169    """GET a bucket."""
170    return self.do_request_async(self.api_url + path, 'GET', **kwds)
171
172
173_StorageApi = rest_api.add_sync_methods(_StorageApi)
174
175
176class ReadBuffer(object):
177  """A class for reading Google storage files."""
178
179  DEFAULT_BUFFER_SIZE = 1024 * 1024
180  MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE
181
182  def __init__(self,
183               api,
184               path,
185               buffer_size=DEFAULT_BUFFER_SIZE,
186               max_request_size=MAX_REQUEST_SIZE):
187    """Constructor.
188
189    Args:
190      api: A StorageApi instance.
191      path: Quoted/escaped path to the object, e.g. /mybucket/myfile
192      buffer_size: buffer size. The ReadBuffer keeps
193        one buffer. But there may be a pending future that contains
194        a second buffer. This size must be less than max_request_size.
195      max_request_size: Max bytes to request in one urlfetch.
196    """
197    self._api = api
198    self._path = path
199    self.name = api_utils._unquote_filename(path)
200    self.closed = False
201
202    assert buffer_size <= max_request_size
203    self._buffer_size = buffer_size
204    self._max_request_size = max_request_size
205    self._offset = 0
206    self._buffer = _Buffer()
207    self._etag = None
208
209    get_future = self._get_segment(0, self._buffer_size, check_response=False)
210
211    status, headers, content = self._api.head_object(path)
212    errors.check_status(status, [200], path, resp_headers=headers, body=content)
213    self._file_size = long(common.get_stored_content_length(headers))
214    self._check_etag(headers.get('etag'))
215
216    self._buffer_future = None
217
218    if self._file_size != 0:
219      content, check_response_closure = get_future.get_result()
220      check_response_closure()
221      self._buffer.reset(content)
222      self._request_next_buffer()
223
224  def __getstate__(self):
225    """Store state as part of serialization/pickling.
226
227    The contents of the read buffer are not stored, only the current offset for
228    data read by the client. A new read buffer is established at unpickling.
229    The head information for the object (file size and etag) are stored to
230    reduce startup and ensure the file has not changed.
231
232    Returns:
233      A dictionary with the state of this object
234    """
235    return {'api': self._api,
236            'path': self._path,
237            'buffer_size': self._buffer_size,
238            'request_size': self._max_request_size,
239            'etag': self._etag,
240            'size': self._file_size,
241            'offset': self._offset,
242            'closed': self.closed}
243
244  def __setstate__(self, state):
245    """Restore state as part of deserialization/unpickling.
246
247    Args:
248      state: the dictionary from a __getstate__ call
249
250    Along with restoring the state, pre-fetch the next read buffer.
251    """
252    self._api = state['api']
253    self._path = state['path']
254    self.name = api_utils._unquote_filename(self._path)
255    self._buffer_size = state['buffer_size']
256    self._max_request_size = state['request_size']
257    self._etag = state['etag']
258    self._file_size = state['size']
259    self._offset = state['offset']
260    self._buffer = _Buffer()
261    self.closed = state['closed']
262    self._buffer_future = None
263    if self._remaining() and not self.closed:
264      self._request_next_buffer()
265
266  def __iter__(self):
267    """Iterator interface.
268
269    Note the ReadBuffer container itself is the iterator. It's
270    (quote PEP0234)
271    'destructive: they consumes all the values and a second iterator
272    cannot easily be created that iterates independently over the same values.
273    You could open the file for the second time, or seek() to the beginning.'
274
275    Returns:
276      Self.
277    """
278    return self
279
280  def next(self):
281    line = self.readline()
282    if not line:
283      raise StopIteration()
284    return line
285
286  def readline(self, size=-1):
287    """Read one line delimited by '\n' from the file.
288
289    A trailing newline character is kept in the string. It may be absent when a
290    file ends with an incomplete line. If the size argument is non-negative,
291    it specifies the maximum string size (counting the newline) to return.
292    A negative size is the same as unspecified. Empty string is returned
293    only when EOF is encountered immediately.
294
295    Args:
296      size: Maximum number of bytes to read. If not specified, readline stops
297        only on '\n' or EOF.
298
299    Returns:
300      The data read as a string.
301
302    Raises:
303      IOError: When this buffer is closed.
304    """
305    self._check_open()
306    if size == 0 or not self._remaining():
307      return ''
308
309    data_list = []
310    newline_offset = self._buffer.find_newline(size)
311    while newline_offset < 0:
312      data = self._buffer.read(size)
313      size -= len(data)
314      self._offset += len(data)
315      data_list.append(data)
316      if size == 0 or not self._remaining():
317        return ''.join(data_list)
318      self._buffer.reset(self._buffer_future.get_result())
319      self._request_next_buffer()
320      newline_offset = self._buffer.find_newline(size)
321
322    data = self._buffer.read_to_offset(newline_offset + 1)
323    self._offset += len(data)
324    data_list.append(data)
325
326    return ''.join(data_list)
327
328  def read(self, size=-1):
329    """Read data from RAW file.
330
331    Args:
332      size: Number of bytes to read as integer. Actual number of bytes
333        read is always equal to size unless EOF is reached. If size is
334        negative or unspecified, read the entire file.
335
336    Returns:
337      data read as str.
338
339    Raises:
340      IOError: When this buffer is closed.
341    """
342    self._check_open()
343    if not self._remaining():
344      return ''
345
346    data_list = []
347    while True:
348      remaining = self._buffer.remaining()
349      if size >= 0 and size < remaining:
350        data_list.append(self._buffer.read(size))
351        self._offset += size
352        break
353      else:
354        size -= remaining
355        self._offset += remaining
356        data_list.append(self._buffer.read())
357
358        if self._buffer_future is None:
359          if size < 0 or size >= self._remaining():
360            needs = self._remaining()
361          else:
362            needs = size
363          data_list.extend(self._get_segments(self._offset, needs))
364          self._offset += needs
365          break
366
367        if self._buffer_future:
368          self._buffer.reset(self._buffer_future.get_result())
369          self._buffer_future = None
370
371    if self._buffer_future is None:
372      self._request_next_buffer()
373    return ''.join(data_list)
374
375  def _remaining(self):
376    return self._file_size - self._offset
377
378  def _request_next_buffer(self):
379    """Request next buffer.
380
381    Requires self._offset and self._buffer are in consistent state.
382    """
383    self._buffer_future = None
384    next_offset = self._offset + self._buffer.remaining()
385    if next_offset != self._file_size:
386      self._buffer_future = self._get_segment(next_offset,
387                                              self._buffer_size)
388
389  def _get_segments(self, start, request_size):
390    """Get segments of the file from Google Storage as a list.
391
392    A large request is broken into segments to avoid hitting urlfetch
393    response size limit. Each segment is returned from a separate urlfetch.
394
395    Args:
396      start: start offset to request. Inclusive. Have to be within the
397        range of the file.
398      request_size: number of bytes to request.
399
400    Returns:
401      A list of file segments in order
402    """
403    if not request_size:
404      return []
405
406    end = start + request_size
407    futures = []
408
409    while request_size > self._max_request_size:
410      futures.append(self._get_segment(start, self._max_request_size))
411      request_size -= self._max_request_size
412      start += self._max_request_size
413    if start < end:
414      futures.append(self._get_segment(start, end-start))
415    return [fut.get_result() for fut in futures]
416
417  @ndb.tasklet
418  def _get_segment(self, start, request_size, check_response=True):
419    """Get a segment of the file from Google Storage.
420
421    Args:
422      start: start offset of the segment. Inclusive. Have to be within the
423        range of the file.
424      request_size: number of bytes to request. Have to be small enough
425        for a single urlfetch request. May go over the logical range of the
426        file.
427      check_response: True to check the validity of GCS response automatically
428        before the future returns. False otherwise. See Yields section.
429
430    Yields:
431      If check_response is True, the segment [start, start + request_size)
432      of the file.
433      Otherwise, a tuple. The first element is the unverified file segment.
434      The second element is a closure that checks response. Caller should
435      first invoke the closure before consuing the file segment.
436
437    Raises:
438      ValueError: if the file has changed while reading.
439    """
440    end = start + request_size - 1
441    content_range = '%d-%d' % (start, end)
442    headers = {'Range': 'bytes=' + content_range}
443    status, resp_headers, content = yield self._api.get_object_async(
444        self._path, headers=headers)
445    def _checker():
446      errors.check_status(status, [200, 206], self._path, headers,
447                          resp_headers, body=content)
448      self._check_etag(resp_headers.get('etag'))
449    if check_response:
450      _checker()
451      raise ndb.Return(content)
452    raise ndb.Return(content, _checker)
453
454  def _check_etag(self, etag):
455    """Check if etag is the same across requests to GCS.
456
457    If self._etag is None, set it. If etag is set, check that the new
458    etag equals the old one.
459
460    In the __init__ method, we fire one HEAD and one GET request using
461    ndb tasklet. One of them would return first and set the first value.
462
463    Args:
464      etag: etag from a GCS HTTP response. None if etag is not part of the
465        response header. It could be None for example in the case of GCS
466        composite file.
467
468    Raises:
469      ValueError: if two etags are not equal.
470    """
471    if etag is None:
472      return
473    elif self._etag is None:
474      self._etag = etag
475    elif self._etag != etag:
476      raise ValueError('File on GCS has changed while reading.')
477
478  def close(self):
479    self.closed = True
480    self._buffer = None
481    self._buffer_future = None
482
483  def __enter__(self):
484    return self
485
486  def __exit__(self, atype, value, traceback):
487    self.close()
488    return False
489
490  def seek(self, offset, whence=os.SEEK_SET):
491    """Set the file's current offset.
492
493    Note if the new offset is out of bound, it is adjusted to either 0 or EOF.
494
495    Args:
496      offset: seek offset as number.
497      whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
498        os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
499        (seek relative to the end, offset should be negative).
500
501    Raises:
502      IOError: When this buffer is closed.
503      ValueError: When whence is invalid.
504    """
505    self._check_open()
506
507    self._buffer.reset()
508    self._buffer_future = None
509
510    if whence == os.SEEK_SET:
511      self._offset = offset
512    elif whence == os.SEEK_CUR:
513      self._offset += offset
514    elif whence == os.SEEK_END:
515      self._offset = self._file_size + offset
516    else:
517      raise ValueError('Whence mode %s is invalid.' % str(whence))
518
519    self._offset = min(self._offset, self._file_size)
520    self._offset = max(self._offset, 0)
521    if self._remaining():
522      self._request_next_buffer()
523
524  def tell(self):
525    """Tell the file's current offset.
526
527    Returns:
528      current offset in reading this file.
529
530    Raises:
531      IOError: When this buffer is closed.
532    """
533    self._check_open()
534    return self._offset
535
536  def _check_open(self):
537    if self.closed:
538      raise IOError('Buffer is closed.')
539
540  def seekable(self):
541    return True
542
543  def readable(self):
544    return True
545
546  def writable(self):
547    return False
548
549
550class _Buffer(object):
551  """In memory buffer."""
552
553  def __init__(self):
554    self.reset()
555
556  def reset(self, content='', offset=0):
557    self._buffer = content
558    self._offset = offset
559
560  def read(self, size=-1):
561    """Returns bytes from self._buffer and update related offsets.
562
563    Args:
564      size: number of bytes to read starting from current offset.
565        Read the entire buffer if negative.
566
567    Returns:
568      Requested bytes from buffer.
569    """
570    if size < 0:
571      offset = len(self._buffer)
572    else:
573      offset = self._offset + size
574    return self.read_to_offset(offset)
575
576  def read_to_offset(self, offset):
577    """Returns bytes from self._buffer and update related offsets.
578
579    Args:
580      offset: read from current offset to this offset, exclusive.
581
582    Returns:
583      Requested bytes from buffer.
584    """
585    assert offset >= self._offset
586    result = self._buffer[self._offset: offset]
587    self._offset += len(result)
588    return result
589
590  def remaining(self):
591    return len(self._buffer) - self._offset
592
593  def find_newline(self, size=-1):
594    """Search for newline char in buffer starting from current offset.
595
596    Args:
597      size: number of bytes to search. -1 means all.
598
599    Returns:
600      offset of newline char in buffer. -1 if doesn't exist.
601    """
602    if size < 0:
603      return self._buffer.find('\n', self._offset)
604    return self._buffer.find('\n', self._offset, self._offset + size)
605
606
607class StreamingBuffer(object):
608  """A class for creating large objects using the 'resumable' API.
609
610  The API is a subset of the Python writable stream API sufficient to
611  support writing zip files using the zipfile module.
612
613  The exact sequence of calls and use of headers is documented at
614  https://developers.google.com/storage/docs/developer-guide#unknownresumables
615  """
616
617  _blocksize = 256 * 1024
618
619  _flushsize = 8 * _blocksize
620
621  _maxrequestsize = 9 * 4 * _blocksize
622
623  def __init__(self,
624               api,
625               path,
626               content_type=None,
627               gcs_headers=None):
628    """Constructor.
629
630    Args:
631      api: A StorageApi instance.
632      path: Quoted/escaped path to the object, e.g. /mybucket/myfile
633      content_type: Optional content-type; Default value is
634        delegate to Google Cloud Storage.
635      gcs_headers: additional gs headers as a str->str dict, e.g
636        {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}.
637    Raises:
638      IOError: When this location can not be found.
639    """
640    assert self._maxrequestsize > self._blocksize
641    assert self._maxrequestsize % self._blocksize == 0
642    assert self._maxrequestsize >= self._flushsize
643
644    self._api = api
645    self._path = path
646
647    self.name = api_utils._unquote_filename(path)
648    self.closed = False
649
650    self._buffer = collections.deque()
651    self._buffered = 0
652    self._written = 0
653    self._offset = 0
654
655    headers = {'x-goog-resumable': 'start'}
656    if content_type:
657      headers['content-type'] = content_type
658    if gcs_headers:
659      headers.update(gcs_headers)
660    status, resp_headers, content = self._api.post_object(path, headers=headers)
661    errors.check_status(status, [201], path, headers, resp_headers,
662                        body=content)
663    loc = resp_headers.get('location')
664    if not loc:
665      raise IOError('No location header found in 201 response')
666    parsed = urlparse.urlparse(loc)
667    self._path_with_token = '%s?%s' % (self._path, parsed.query)
668
669  def __getstate__(self):
670    """Store state as part of serialization/pickling.
671
672    The contents of the write buffer are stored. Writes to the underlying
673    storage are required to be on block boundaries (_blocksize) except for the
674    last write. In the worst case the pickled version of this object may be
675    slightly larger than the blocksize.
676
677    Returns:
678      A dictionary with the state of this object
679
680    """
681    return {'api': self._api,
682            'path': self._path,
683            'path_token': self._path_with_token,
684            'buffer': self._buffer,
685            'buffered': self._buffered,
686            'written': self._written,
687            'offset': self._offset,
688            'closed': self.closed}
689
690  def __setstate__(self, state):
691    """Restore state as part of deserialization/unpickling.
692
693    Args:
694      state: the dictionary from a __getstate__ call
695    """
696    self._api = state['api']
697    self._path_with_token = state['path_token']
698    self._buffer = state['buffer']
699    self._buffered = state['buffered']
700    self._written = state['written']
701    self._offset = state['offset']
702    self.closed = state['closed']
703    self._path = state['path']
704    self.name = api_utils._unquote_filename(self._path)
705
706  def write(self, data):
707    """Write some bytes.
708
709    Args:
710      data: data to write. str.
711
712    Raises:
713      TypeError: if data is not of type str.
714    """
715    self._check_open()
716    if not isinstance(data, str):
717      raise TypeError('Expected str but got %s.' % type(data))
718    if not data:
719      return
720    self._buffer.append(data)
721    self._buffered += len(data)
722    self._offset += len(data)
723    if self._buffered >= self._flushsize:
724      self._flush()
725
726  def flush(self):
727    """Flush as much as possible to GCS.
728
729    GCS *requires* that all writes except for the final one align on
730    256KB boundaries. So the internal buffer may still have < 256KB bytes left
731    after flush.
732    """
733    self._check_open()
734    self._flush(finish=False)
735
736  def tell(self):
737    """Return the total number of bytes passed to write() so far.
738
739    (There is no seek() method.)
740    """
741    return self._offset
742
743  def close(self):
744    """Flush the buffer and finalize the file.
745
746    When this returns the new file is available for reading.
747    """
748    if not self.closed:
749      self.closed = True
750      self._flush(finish=True)
751      self._buffer = None
752
753  def __enter__(self):
754    return self
755
756  def __exit__(self, atype, value, traceback):
757    self.close()
758    return False
759
760  def _flush(self, finish=False):
761    """Internal API to flush.
762
763    Buffer is flushed to GCS only when the total amount of buffered data is at
764    least self._blocksize, or to flush the final (incomplete) block of
765    the file with finish=True.
766    """
767    while ((finish and self._buffered >= 0) or
768           (not finish and self._buffered >= self._blocksize)):
769      tmp_buffer = []
770      tmp_buffer_len = 0
771
772      excess = 0
773      while self._buffer:
774        buf = self._buffer.popleft()
775        size = len(buf)
776        self._buffered -= size
777        tmp_buffer.append(buf)
778        tmp_buffer_len += size
779        if tmp_buffer_len >= self._maxrequestsize:
780          excess = tmp_buffer_len - self._maxrequestsize
781          break
782        if not finish and (
783            tmp_buffer_len % self._blocksize + self._buffered <
784            self._blocksize):
785          excess = tmp_buffer_len % self._blocksize
786          break
787
788      if excess:
789        over = tmp_buffer.pop()
790        size = len(over)
791        assert size >= excess
792        tmp_buffer_len -= size
793        head, tail = over[:-excess], over[-excess:]
794        self._buffer.appendleft(tail)
795        self._buffered += len(tail)
796        if head:
797          tmp_buffer.append(head)
798          tmp_buffer_len += len(head)
799
800      data = ''.join(tmp_buffer)
801      file_len = '*'
802      if finish and not self._buffered:
803        file_len = self._written + len(data)
804      self._send_data(data, self._written, file_len)
805      self._written += len(data)
806      if file_len != '*':
807        break
808
809  def _send_data(self, data, start_offset, file_len):
810    """Send the block to the storage service.
811
812    This is a utility method that does not modify self.
813
814    Args:
815      data: data to send in str.
816      start_offset: start offset of the data in relation to the file.
817      file_len: an int if this is the last data to append to the file.
818        Otherwise '*'.
819    """
820    headers = {}
821    end_offset = start_offset + len(data) - 1
822
823    if data:
824      headers['content-range'] = ('bytes %d-%d/%s' %
825                                  (start_offset, end_offset, file_len))
826    else:
827      headers['content-range'] = ('bytes */%s' % file_len)
828
829    status, response_headers, content = self._api.put_object(
830        self._path_with_token, payload=data, headers=headers)
831    if file_len == '*':
832      expected = 308
833    else:
834      expected = 200
835    errors.check_status(status, [expected], self._path, headers,
836                        response_headers, content,
837                        {'upload_path': self._path_with_token})
838
839  def _get_offset_from_gcs(self):
840    """Get the last offset that has been written to GCS.
841
842    This is a utility method that does not modify self.
843
844    Returns:
845      an int of the last offset written to GCS by this upload, inclusive.
846      -1 means nothing has been written.
847    """
848    headers = {'content-range': 'bytes */*'}
849    status, response_headers, content = self._api.put_object(
850        self._path_with_token, headers=headers)
851    errors.check_status(status, [308], self._path, headers,
852                        response_headers, content,
853                        {'upload_path': self._path_with_token})
854    val = response_headers.get('range')
855    if val is None:
856      return -1
857    _, offset = val.rsplit('-', 1)
858    return int(offset)
859
860  def _force_close(self, file_length=None):
861    """Close this buffer on file_length.
862
863    Finalize this upload immediately on file_length.
864    Contents that are still in memory will not be uploaded.
865
866    This is a utility method that does not modify self.
867
868    Args:
869      file_length: file length. Must match what has been uploaded. If None,
870        it will be queried from GCS.
871    """
872    if file_length is None:
873      file_length = self._get_offset_from_gcs() + 1
874    self._send_data('', 0, file_length)
875
876  def _check_open(self):
877    if self.closed:
878      raise IOError('Buffer is closed.')
879
880  def seekable(self):
881    return False
882
883  def readable(self):
884    return False
885
886  def writable(self):
887    return True
888